Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: return from deliver tx upon timeout #1927

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gno.land/pkg/gnoland/node_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
cfg.TMConfig.ProxyApp,
cfg.TMConfig.ABCI,
cfg.TMConfig.DBDir(),
cfg.TMConfig.RPC.TimeoutBroadcastTxCommit,

Check warning on line 133 in gno.land/pkg/gnoland/node_inmemory.go

View check run for this annotation

Codecov / codecov/patch

gno.land/pkg/gnoland/node_inmemory.go#L133

Added line #L133 was not covered by tests
)

// Create genesis factory
Expand Down
27 changes: 25 additions & 2 deletions tm2/pkg/bft/abci/client/local_client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package abcicli

import (
"context"
"sync"
"time"

abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types"
"github.com/gnolang/gno/tm2/pkg/service"
Expand All @@ -19,15 +21,18 @@ type localClient struct {
mtx *sync.Mutex
abci.Application
Callback

timeout time.Duration
}

func NewLocalClient(mtx *sync.Mutex, app abci.Application) *localClient {
func NewLocalClient(mtx *sync.Mutex, app abci.Application, timeout time.Duration) *localClient {
if mtx == nil {
mtx = new(sync.Mutex)
}
cli := &localClient{
mtx: mtx,
Application: app,
timeout: timeout,
}
cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
Expand Down Expand Up @@ -79,7 +84,25 @@ func (app *localClient) DeliverTxAsync(req abci.RequestDeliverTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.DeliverTx(req)
ctx := context.Background()
if app.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, app.timeout)
defer cancel()
}

respCh := make(chan abci.ResponseDeliverTx, 1)
go func() {
respCh <- app.Application.DeliverTx(req)
close(respCh)
}()

var res abci.ResponseDeliverTx
select {
case <-ctx.Done():
return nil
case res = <-respCh:
}
return app.completeRequest(req, res)
}

Expand Down
5 changes: 4 additions & 1 deletion tm2/pkg/bft/abci/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package abci

import (
"context"
"time"

"github.com/gnolang/gno/tm2/pkg/crypto"
Expand All @@ -14,7 +15,9 @@ type Request interface {
AssertRequest()
}

type RequestBase struct{}
type RequestBase struct {
Ctx context.Context
}

func (RequestBase) AssertRequest() {}

Expand Down
5 changes: 3 additions & 2 deletions tm2/pkg/bft/consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S

// one for mempool, one for consensus
mtx := new(sync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app, thisConfig.RPC.TimeoutBroadcastTxCommit)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app, thisConfig.RPC.TimeoutBroadcastTxCommit)

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0, state.ConsensusParams.Block.MaxTxBytes)
Expand Down Expand Up @@ -649,6 +649,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
app.InitChain(abci.RequestInitChain{Validators: vals})
// sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above

thisConfig.RPC.TimeoutBroadcastTxCommit = time.Second * 10
css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, app)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
Expand Down
3 changes: 3 additions & 0 deletions tm2/pkg/bft/consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
app := NewCounterApplication()
config.RPC.TimeoutBroadcastTxCommit = time.Second * 10
cs := newConsensusStateWithConfig(config, state, privVals[0], app)
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand All @@ -56,6 +57,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(1, false, 10)
app := NewCounterApplication()
config.RPC.TimeoutBroadcastTxCommit = time.Second * 10
cs := newConsensusStateWithConfig(config, state, privVals[0], app)
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand All @@ -80,6 +82,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
app := NewCounterApplication()
config.RPC.TimeoutBroadcastTxCommit = time.Second * 10
cs := newConsensusStateWithConfig(config, state, privVals[0], app)
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand Down
1 change: 1 addition & 0 deletions tm2/pkg/bft/consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cnscfg.Consensu
config.ProxyApp,
config.ABCI,
config.DBDir(),
0,
)
proxyApp := appconn.NewAppConns(clientCreator)
err = proxyApp.Start()
Expand Down
8 changes: 4 additions & 4 deletions tm2/pkg/bft/consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
kvstoreApp := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode)))
defer kvstoreApp.Close()

clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
clientCreator2 := proxy.NewLocalClientCreatorWithTimeout(kvstoreApp, config.RPC.TimeoutBroadcastTxCommit)
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
Expand Down Expand Up @@ -788,7 +788,7 @@ func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, c
// run the whole chain against this client to build up the tendermint state
app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer app.Close()
clientCreator := proxy.NewLocalClientCreator(app)
clientCreator := proxy.NewLocalClientCreatorWithTimeout(app, config.RPC.TimeoutBroadcastTxCommit)
proxyApp := appconn.NewAppConns(clientCreator)
if err := proxyApp.Start(); err != nil {
panic(err)
Expand Down Expand Up @@ -850,7 +850,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - 0x03
{
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
clientCreator := proxy.NewLocalClientCreator(app)
clientCreator := proxy.NewLocalClientCreatorWithTimeout(app, config.RPC.TimeoutBroadcastTxCommit)
proxyApp := appconn.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(t, err)
Expand All @@ -868,7 +868,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - RANDOM HASH
{
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
clientCreator := proxy.NewLocalClientCreator(app)
clientCreator := proxy.NewLocalClientCreatorWithTimeout(app, config.RPC.TimeoutBroadcastTxCommit)
proxyApp := appconn.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/bft/consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
sm.SaveState(stateDB, state)
blockStore := store.NewBlockStore(blockStoreDB)

proxyApp := appconn.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp := appconn.NewAppConns(proxy.NewLocalClientCreatorWithTimeout(app, config.RPC.TimeoutBroadcastTxCommit))
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return errors.Wrap(err, "failed to start proxy app connections")
Expand Down
1 change: 1 addition & 0 deletions tm2/pkg/bft/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func DefaultNewNode(config *cfg.Config, logger *slog.Logger) (*Node, error) {
config.ProxyApp,
config.ABCI,
config.DBDir(),
config.RPC.TimeoutBroadcastTxCommit,
)

return NewNode(config,
Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/bft/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
n, err := NewNode(config,
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
nodeKey,
proxy.DefaultClientCreator(nil, config.ProxyApp, config.ABCI, config.DBDir()),
proxy.DefaultClientCreator(nil, config.ProxyApp, config.ABCI, config.DBDir(), config.RPC.TimeoutBroadcastTxCommit),
DefaultGenesisDocProviderFunc(config),
DefaultDBProvider,
log.NewTestingLogger(t),
Expand Down
31 changes: 19 additions & 12 deletions tm2/pkg/bft/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"sync"
"time"

abcicli "github.com/gnolang/gno/tm2/pkg/bft/abci/client"
"github.com/gnolang/gno/tm2/pkg/bft/abci/example/counter"
Expand All @@ -18,43 +19,49 @@ type ClientCreator interface {
// local proxy uses a mutex on an in-proc app

type localClientCreator struct {
mtx *sync.Mutex
app abci.Application
mtx *sync.Mutex
app abci.Application
timeout time.Duration
}

func NewLocalClientCreator(app abci.Application) ClientCreator {
return NewLocalClientCreatorWithTimeout(app, 0)
}

func NewLocalClientCreatorWithTimeout(app abci.Application, timeout time.Duration) ClientCreator {
return &localClientCreator{
mtx: new(sync.Mutex),
app: app,
mtx: new(sync.Mutex),
app: app,
timeout: timeout,
}
}

func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) {
return abcicli.NewLocalClient(l.mtx, l.app), nil
return abcicli.NewLocalClient(l.mtx, l.app, l.timeout), nil
}

//-----------------------------------------------------------------
// DefaultClientCreator

// Returns the local application, or constructs a new one via proxy.
// This function is meant to work with config fields.
func DefaultClientCreator(local abci.Application, proxy string, transport, dbDir string) ClientCreator {
func DefaultClientCreator(local abci.Application, proxy string, transport, dbDir string, timeout time.Duration) ClientCreator {
if local != nil {
// local applications (ignores other arguments)
return NewLocalClientCreator(local)
return NewLocalClientCreatorWithTimeout(local, timeout)
} else {
switch proxy {
// default mock applications
case "mock://counter":
return NewLocalClientCreator(counter.NewCounterApplication(false))
return NewLocalClientCreatorWithTimeout(counter.NewCounterApplication(false), timeout)
case "mock://counter_serial":
return NewLocalClientCreator(counter.NewCounterApplication(true))
return NewLocalClientCreatorWithTimeout(counter.NewCounterApplication(true), timeout)
case "mock://kvstore":
return NewLocalClientCreator(kvstore.NewKVStoreApplication())
return NewLocalClientCreatorWithTimeout(kvstore.NewKVStoreApplication(), timeout)
case "mock://persistent_kvstore":
return NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(dbDir))
return NewLocalClientCreatorWithTimeout(kvstore.NewPersistentKVStoreApplication(dbDir), timeout)
case "mock://noop":
return NewLocalClientCreator(abci.NewBaseApplication())
return NewLocalClientCreatorWithTimeout(abci.NewBaseApplication(), timeout)
default:
// socket transport applications
panic("proxy scheme not yet supported: " + proxy)
Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/bft/rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
pvKeyFile := config.PrivValidatorKeyFile()
pvKeyStateFile := config.PrivValidatorStateFile()
pv := privval.LoadOrGenFilePV(pvKeyFile, pvKeyStateFile)
papp := proxy.NewLocalClientCreator(app)
papp := proxy.NewLocalClientCreatorWithTimeout(app, config.RPC.TimeoutBroadcastTxCommit)
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
if err != nil {
panic(err)
Expand Down
11 changes: 10 additions & 1 deletion tm2/pkg/bft/state/execution.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package state

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/gnolang/gno/tm2/pkg/amino"
abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types"
Expand Down Expand Up @@ -250,7 +252,14 @@ func execBlockOnProxyApp(

// Run txs of block.
for _, tx := range block.Txs {
proxyAppConn.DeliverTxAsync(abci.RequestDeliverTx{Tx: tx})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
proxyAppConn.DeliverTxAsync(
abci.RequestDeliverTx{
RequestBase: abci.RequestBase{Ctx: ctx},
Tx: tx,
},
)
cancel()
if err := proxyAppConn.Error(); err != nil {
return nil, err
}
Expand Down
Loading