diff --git a/abci/client/local_client.go b/abci/client/local_client.go index c30427e37..65e3c4e5d 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -93,12 +93,24 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes { ) } -func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { - res := app.Application.CheckTx(req) - return app.callback( - types.ToRequestCheckTx(req), - types.ToResponseCheckTx(res), - ) +func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes { + req := types.ToRequestCheckTx(params) + reqRes := NewReqRes(req) + + app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) { + res := types.ToResponseCheckTx(r) + app.Callback(req, res) + reqRes.Response = res + reqRes.Done() + reqRes.SetDone() + + // Notify reqRes listener if set + if cb := reqRes.GetCallback(); cb != nil { + cb(res) + } + }) + + return reqRes } func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes { @@ -251,7 +263,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon } func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - res := app.Application.CheckTx(req) + res := app.Application.CheckTxSync(req) return &res, nil } @@ -355,6 +367,7 @@ func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRe func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes { reqRes := NewReqRes(req) reqRes.Response = res + reqRes.Done() reqRes.SetDone() return reqRes } diff --git a/abci/example/counter/counter.go b/abci/example/counter/counter.go index d5dae6d5a..3c9225e0e 100644 --- a/abci/example/counter/counter.go +++ b/abci/example/counter/counter.go @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli return types.ResponseDeliverTx{Code: code.CodeTypeOK} } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { +func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx { + return app.checkTx(req) +} + +func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) { + callback(app.checkTx(req)) +} + +func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx { if app.serial { if len(req.Tx) > 8 { return types.ResponseCheckTx{ diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 62cd455e0..4b1e76553 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -117,7 +117,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { +func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx { + return app.checkTx(req) +} + +func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) { + callback(app.checkTx(req)) +} + +func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx { return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index 255298862..bc269bf37 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -79,8 +79,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t return app.app.DeliverTx(req) } -func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { - return app.app.CheckTx(req) +func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx { + return app.app.CheckTxSync(req) +} + +func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) { + app.app.CheckTxAsync(req, callback) } func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx { diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index 7711e4fae..caa0c4222 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -207,7 +207,7 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types res := s.app.DeliverTx(*r.DeliverTx) responses <- types.ToResponseDeliverTx(res) case *types.Request_CheckTx: - res := s.app.CheckTx(*r.CheckTx) + res := s.app.CheckTxSync(*r.CheckTx) responses <- types.ToResponseCheckTx(res) case *types.Request_Commit: res := s.app.Commit() diff --git a/abci/types/application.go b/abci/types/application.go index a0a17ee61..37ecae3d9 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -4,6 +4,8 @@ import ( context "golang.org/x/net/context" ) +type CheckTxCallback func(ResponseCheckTx) + // Application is an interface that enables any finite, deterministic state machine // to be driven by a blockchain-based replication engine via the ABCI. // All methods take a RequestXxx argument and return a ResponseXxx argument, @@ -15,7 +17,8 @@ type Application interface { Query(RequestQuery) ResponseQuery // Query for state // Mempool Connection - CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool + CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool + CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking @@ -57,10 +60,22 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx { return ResponseDeliverTx{Code: CodeTypeOK} } -func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx { +func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx { return ResponseCheckTx{Code: CodeTypeOK} } +func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) { + callback(ResponseCheckTx{Code: CodeTypeOK}) +} + +func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx { + return ResponseBeginRecheckTx{Code: CodeTypeOK} +} + +func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx { + return ResponseEndRecheckTx{Code: CodeTypeOK} +} + func (BaseApplication) Commit() ResponseCommit { return ResponseCommit{} } @@ -81,14 +96,6 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock { return ResponseEndBlock{} } -func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx { - return ResponseBeginRecheckTx{Code: CodeTypeOK} -} - -func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx { - return ResponseEndRecheckTx{Code: CodeTypeOK} -} - func (BaseApplication) ListSnapshots(req RequestListSnapshots) ResponseListSnapshots { return ResponseListSnapshots{} } @@ -140,7 +147,18 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx } func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) { - res := app.app.CheckTx(*req) + res := app.app.CheckTxSync(*req) + return &res, nil +} + +func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) ( + *ResponseBeginRecheckTx, error) { + res := app.app.BeginRecheckTx(*req) + return &res, nil +} + +func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) { + res := app.app.EndRecheckTx(*req) return &res, nil } @@ -169,17 +187,6 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock) return &res, nil } -func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) ( - *ResponseBeginRecheckTx, error) { - res := app.app.BeginRecheckTx(*req) - return &res, nil -} - -func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) { - res := app.app.EndRecheckTx(*req) - return &res, nil -} - func (app *GRPCApplication) ListSnapshots( ctx context.Context, req *RequestListSnapshots) (*ResponseListSnapshots, error) { res := app.app.ListSnapshots(*req) diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index d86d06164..64417dae9 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -332,10 +332,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx return abci.ResponseDeliverTx{Events: []abci.Event{}} } -func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { return abci.ResponseCheckTx{} } +func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + callback(abci.ResponseCheckTx{}) +} + func (app *testApp) Commit() abci.ResponseCommit { return abci.ResponseCommit{} } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index ee36d419b..0cb82aa9d 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -103,7 +103,7 @@ func deliverTxsRange(cs *State, start, end int) { for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{}) + _, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{}) if err != nil { panic(fmt.Sprintf("Error after CheckTx: %v", err)) } @@ -167,13 +167,13 @@ func TestMempoolRmBadTx(t *testing.T) { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) { + err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) { if r.GetCheckTx().Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return } checkTxRespCh <- struct{}{} - }, mempl.TxInfo{}) + }) if err != nil { t.Errorf("error after CheckTx: %v", err) return @@ -239,7 +239,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons return abci.ResponseDeliverTx{Code: code.CodeTypeOK} } -func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { + return app.checkTx(req) +} + +func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + callback(app.checkTx(req)) +} + +func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx { txValue := txAsUint64(req.Tx) app.mempoolTxCountMtx.Lock() defer app.mempoolTxCountMtx.Unlock() diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index c9d0adb08..d4bd4a610 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -220,7 +220,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // send a tx - if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil { + if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil { t.Error(err) } @@ -548,7 +548,7 @@ func waitForAndValidateBlock( err := validateBlock(newBlock, activeVals) assert.Nil(t, err) for _, tx := range txs { - err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{}) + _, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{}) assert.Nil(t, err) } }, css) diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index adab0edf8..cdeaf701f 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -18,7 +18,10 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} func (emptyMempool) Size() int { return 0 } -func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { +func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { + return nil, nil +} +func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { return nil } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 6191776a4..5125facd9 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -114,7 +114,7 @@ func sendTxs(ctx context.Context, cs *State) { return default: tx := []byte{byte(i)} - if err := assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{}); err != nil { + if _, err := assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{}); err != nil { panic(err) } i++ @@ -452,7 +452,7 @@ func TestSimulateValidatorsChange(t *testing.T) { valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1) assert.Nil(t, err) newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{}) assert.Nil(t, err) }) @@ -466,15 +466,13 @@ func TestSimulateValidatorsChange(t *testing.T) { updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1) require.NoError(t, err) updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) - err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{}) assert.Nil(t, err) }) // height 4 height++ incrementHeight(vss...) - - // re-calculate vss newVss := make([]*validatorStub, nVals+1) copy(newVss, vss[:nVals+1]) sort.Sort(ValidatorStubsByPower(newVss)) @@ -504,14 +502,14 @@ func TestSimulateValidatorsChange(t *testing.T) { newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2) require.NoError(t, err) newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{}) assert.Nil(t, err) newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey() require.NoError(t, err) newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3) require.NoError(t, err) newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{}) assert.Nil(t, err) }) @@ -545,7 +543,7 @@ func TestSimulateValidatorsChange(t *testing.T) { newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3) require.NoError(t, err) removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) - err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{}) assert.Nil(t, err) }) diff --git a/consensus/state_test.go b/consensus/state_test.go index a416ffad5..3b6949c4f 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -2347,7 +2347,7 @@ func addValidator(cs *State, vssMap map[string]*validatorStub, height int64) { panic("failed to convert newVal to protobuf") } newValidatorTx := kvstore.MakeValSetChangeTx(val.PubKey, 10) - _ = assertMempool(cs.txNotifier).CheckTx(newValidatorTx, nil, mempl.TxInfo{}) + _, _ = assertMempool(cs.txNotifier).CheckTxSync(newValidatorTx, mempl.TxInfo{}) vssMap[newVal.PubKey.Address().String()] = newValidatorStub(privVal, int32(len(vssMap)+1)) vssMap[newVal.PubKey.Address().String()].Height = height } diff --git a/mempool/bench_test.go b/mempool/bench_test.go index 18767408a..2cfe94e3e 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -18,9 +18,7 @@ func BenchmarkReap(b *testing.B) { for i := 0; i < size; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - if err := mempool.CheckTx(tx, nil, TxInfo{}); err != nil { - b.Error(err) - } + mempool.CheckTxSync(tx, TxInfo{}) // nolint: errcheck } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -28,7 +26,38 @@ func BenchmarkReap(b *testing.B) { } } -func BenchmarkCheckTx(b *testing.B) { +func BenchmarkReapWithCheckTxAsync(b *testing.B) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + size := 10000 + for i := 0; i < size; i++ { + tx := make([]byte, 8) + binary.BigEndian.PutUint64(tx, uint64(i)) + mempool.CheckTxAsync(tx, TxInfo{}, nil) // nolint: errcheck + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + mempool.ReapMaxBytesMaxGas(100000000, 10000000) + } +} + +func BenchmarkCheckTxSync(b *testing.B) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + for i := 0; i < b.N; i++ { + tx := make([]byte, 8) + binary.BigEndian.PutUint64(tx, uint64(i)) + mempool.CheckTxSync(tx, TxInfo{}) // nolint: errcheck + } +} + +func BenchmarkCheckTxAsync(b *testing.B) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) mempool, cleanup := newMempoolWithApp(cc) @@ -37,9 +66,7 @@ func BenchmarkCheckTx(b *testing.B) { for i := 0; i < b.N; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - if err := mempool.CheckTx(tx, nil, TxInfo{}); err != nil { - b.Error(err) - } + mempool.CheckTxAsync(tx, TxInfo{}, nil) // nolint: errcheck } } diff --git a/mempool/cache_test.go b/mempool/cache_test.go index e560e8163..072c3f03a 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -59,7 +59,7 @@ func TestCacheAfterUpdate(t *testing.T) { for tcIndex, tc := range tests { for i := 0; i < tc.numTxsToCreate; i++ { tx := types.Tx{byte(i)} - err := mempool.CheckTx(tx, nil, TxInfo{}) + _, err := mempool.CheckTxSync(tx, TxInfo{}) require.NoError(t, err) } @@ -74,7 +74,7 @@ func TestCacheAfterUpdate(t *testing.T) { for _, v := range tc.reAddIndices { tx := types.Tx{byte(v)} - _ = mempool.CheckTx(tx, nil, TxInfo{}) + _, _ = mempool.CheckTxSync(tx, TxInfo{}) } cache := mempool.cache.(*mapTxCache) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 5d6b49c70..d1382310d 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -1,7 +1,6 @@ package mempool import ( - "bytes" "container/list" "crypto/sha256" "fmt" @@ -58,12 +57,6 @@ type CListMempool struct { txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool - // Track whether we're rechecking txs. - // These are not protected by a mutex and are expected to be mutated in - // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement txsMap sync.Map @@ -90,14 +83,12 @@ func NewCListMempool( options ...CListMempoolOption, ) *CListMempool { mempool := &CListMempool{ - config: config, - proxyAppConn: proxyAppConn, - txs: clist.New(), - height: height, - recheckCursor: nil, - recheckEnd: nil, - logger: log.NewNopLogger(), - metrics: NopMetrics(), + config: config, + proxyAppConn: proxyAppConn, + txs: clist.New(), + height: height, + logger: log.NewNopLogger(), + metrics: NopMetrics(), } if config.CacheSize > 0 { mempool.cache = newMapTxCache(config.CacheSize) @@ -222,17 +213,67 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.WaitChan() } +// It blocks if we're waiting on Update() or Reap(). +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Response, err error) { + mem.updateMtx.RLock() + // use defer to unlock mutex because application (*local client*) might panic + defer mem.updateMtx.RUnlock() + + if err = mem.prepareCheckTx(tx, txInfo); err != nil { + return res, err + } + + // CONTRACT: `app.CheckTxSync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) + var r *abci.ResponseCheckTx + r, err = mem.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx}) + if err != nil { + return res, err + } + + // TODO refactor to pass a `pointer` directly + res = abci.ToResponseCheckTx(*r) + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, nil) + return res, err +} + // It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. // // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { +func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) { mem.updateMtx.RLock() // use defer to unlock mutex because application (*local client*) might panic - defer mem.updateMtx.RUnlock() + defer func() { + if err != nil { + mem.updateMtx.RUnlock() + return + } + + if r := recover(); r != nil { + mem.updateMtx.RUnlock() + panic(r) + } + }() + + if err = mem.prepareCheckTx(tx, txInfo); err != nil { + return err + } + + // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) + reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) + reqRes.SetCallback(func(res *abci.Response) { + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb) + mem.updateMtx.RUnlock() + }) + + return err +} +// CONTRACT: `caller` should held `mem.updateMtx.RLock()` +func (mem *CListMempool) prepareCheckTx(tx types.Tx, txInfo TxInfo) error { txSize := len(tx) if err := mem.isFull(txSize); err != nil { @@ -289,10 +330,6 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return err } - // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) - reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) - return nil } @@ -306,15 +343,18 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx // When rechecking, we don't need the peerID, so the recheck callback happens // here. func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { - if mem.recheckCursor == nil { + checkTxReq := req.GetCheckTx() + if checkTxReq == nil { return } - mem.metrics.RecheckCount.Add(1) - mem.resCbRecheck(req, res) + if checkTxReq.Type == abci.CheckTxType_Recheck { + mem.metrics.RecheckCount.Add(1) + mem.resCbRecheck(req, res) - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) + } } // Request specific callback that should be set on individual reqRes objects @@ -330,23 +370,17 @@ func (mem *CListMempool) reqResCb( tx []byte, peerID uint16, peerP2PID p2p.ID, + res *abci.Response, externalCb func(*abci.Response), -) func(res *abci.Response) { - return func(res *abci.Response) { - if mem.recheckCursor != nil { - // this should never happen - panic("recheck cursor is not nil in reqResCb") - } - - mem.resCbFirstTime(tx, peerID, peerP2PID, res) +) { + mem.resCbFirstTime(tx, peerID, peerP2PID, res) - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) - // passed in by the caller of CheckTx, eg. the RPC - if externalCb != nil { - externalCb(res) - } + // passed in by the caller of CheckTx, eg. the RPC + if externalCb != nil { + externalCb(res) } } @@ -481,34 +515,19 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: tx := req.GetCheckTx().Tx - memTx := mem.recheckCursor.Value.(*mempoolTx) - if !bytes.Equal(tx, memTx.tx) { - panic(fmt.Sprintf( - "Unexpected tx response from proxy during recheck\nExpected %X, got %X", - memTx.tx, - tx)) - } - if r.CheckTx.Code == abci.CodeTypeOK { - // Good, nothing to do. - } else { - // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r) - // NOTE: we remove tx from the cache because it might be good later - mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) - } - if mem.recheckCursor == mem.recheckEnd { - mem.recheckCursor = nil - } else { - mem.recheckCursor = mem.recheckCursor.Next() - } - if mem.recheckCursor == nil { - // Done! - mem.logger.Info("Done rechecking txs") - - // incase the recheck removed all txs - if mem.Size() > 0 { - mem.notifyTxsAvailable() + txHash := TxKey(tx) + if e, ok := mem.txsMap.Load(txHash); ok { + celem := e.(*clist.CElement) + if r.CheckTx.Code == abci.CodeTypeOK { + // Good, nothing to do. + } else { + // Tx became invalidated due to newly committed block. + mem.logger.Info("tx is no longer valid", "tx", txID(tx), "res", r) + // NOTE: we remove tx from the cache because it might be good later + mem.removeTx(tx, celem, true) } + } else { + panic(fmt.Sprintf("unexpected tx response from proxy during recheck\ntxHash=%X, tx=%X", txHash, tx)) } default: // ignore other messages @@ -650,8 +669,10 @@ func (mem *CListMempool) Update( // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. - } else if mem.Size() > 0 { - // just notify there're some txs left. + } + + // notify there're some txs left. + if mem.Size() > 0 { mem.notifyTxsAvailable() } @@ -666,20 +687,25 @@ func (mem *CListMempool) recheckTxs() { return } - mem.recheckCursor = mem.txs.Front() - mem.recheckEnd = mem.txs.Back() + wg := sync.WaitGroup{} // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. for e := mem.txs.Front(); e != nil; e = e.Next() { + wg.Add(1) + memTx := e.Value.(*mempoolTx) - mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ + reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ Tx: memTx.tx, Type: abci.CheckTxType_Recheck, }) + reqRes.SetCallback(func(res *abci.Response) { + wg.Done() + }) } mem.proxyAppConn.FlushAsync() + wg.Wait() } //-------------------------------------------------------------------------------- diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index d72394d49..1f0b7c5c1 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -77,7 +77,7 @@ func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs if err != nil { t.Error(err) } - if err := mempool.CheckTx(txBytes, nil, txInfo); err != nil { + if _, err := mempool.CheckTxSync(txBytes, txInfo); err != nil { // Skip invalid txs. // TestMempoolFilters will fail otherwise. It asserts a number of txs // returned. @@ -100,7 +100,8 @@ func TestReapMaxBytesMaxGas(t *testing.T) { checkTxs(t, mempool, 1, UnknownPeerID) tx0 := mempool.TxsFront().Value.(*mempoolTx) // assert that kv store has gas wanted = 1. - require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1") + require.Equal(t, + app.CheckTxSync(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1") require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly") // ensure each tx is 20 bytes long require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes") @@ -182,7 +183,7 @@ func TestMempoolUpdate(t *testing.T) { err := mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}), abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) - err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x01}, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) } @@ -190,7 +191,7 @@ func TestMempoolUpdate(t *testing.T) { // 2. Removes valid txs from the mempool { - err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x02}, TxInfo{}) require.NoError(t, err) err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) @@ -199,13 +200,13 @@ func TestMempoolUpdate(t *testing.T) { // 3. Removes invalid transactions from the cache and the mempool (if present) { - err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x03}, TxInfo{}) require.NoError(t, err) err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil) require.NoError(t, err) assert.Zero(t, mempool.Size()) - err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x03}, TxInfo{}) require.NoError(t, err) } } @@ -226,7 +227,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 1) - err := mempool.CheckTx(b, nil, TxInfo{}) + _, err := mempool.CheckTxSync(b, TxInfo{}) require.NoError(t, err) // simulate new block @@ -237,13 +238,13 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { require.NoError(t, err) // a must be added to the cache - err = mempool.CheckTx(a, nil, TxInfo{}) + _, err = mempool.CheckTxSync(a, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) } // b must remain in the cache - err = mempool.CheckTx(b, nil, TxInfo{}) + _, err = mempool.CheckTxSync(b, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) } @@ -257,10 +258,10 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { // remove a from the cache to test (2) mempool.cache.Remove(a) - err := mempool.CheckTx(a, nil, TxInfo{}) + _, err := mempool.CheckTxSync(a, TxInfo{}) require.NoError(t, err) - err = mempool.CheckTx(a, nil, TxInfo{}) + _, err = mempool.CheckTxSync(a, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) } @@ -300,7 +301,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // now call update with all the txs. it should not fire as there are no txs left - committedTxs = append(txs, moreTxs...) //nolint: gocritic + committedTxs = append(txs, moreTxs...) // nolint: gocritic if err := mempool.Update(newTestBlock(2, committedTxs), abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) @@ -334,7 +335,7 @@ func TestSerialReap(t *testing.T) { // This will succeed txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mempool.CheckTx(txBytes, nil, TxInfo{}) + _, err := mempool.CheckTxSync(txBytes, TxInfo{}) _, cached := cacheMap[string(txBytes)] if cached { require.NotNil(t, err, "expected error for cached tx") @@ -344,7 +345,7 @@ func TestSerialReap(t *testing.T) { cacheMap[string(txBytes)] = struct{}{} // Duplicates are cached and should return error - err = mempool.CheckTx(txBytes, nil, TxInfo{}) + _, err = mempool.CheckTxSync(txBytes, TxInfo{}) require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") } } @@ -390,7 +391,7 @@ func TestSerialReap(t *testing.T) { } } - //---------------------------------------- + // ---------------------------------------- // Deliver some txs. deliverTxsRange(0, 100) @@ -452,7 +453,7 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 1, len(m2), "expecting the wal match in") // 5. Write some contents to the WAL - err = mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{}) + _, err = mempool.CheckTxSync(types.Tx([]byte("foo")), TxInfo{}) require.NoError(t, err) walFilepath := mempool.wal.Path sum1 := checksumFile(walFilepath, t) @@ -463,7 +464,7 @@ func TestMempoolCloseWAL(t *testing.T) { // 7. Invoke CloseWAL() and ensure it discards the // WAL thus any other write won't go through. mempool.CloseWAL() - err = mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{}) + _, err = mempool.CheckTxSync(types.Tx([]byte("bar")), TxInfo{}) require.NoError(t, err) sum2 := checksumFile(walFilepath, t) require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded") @@ -502,7 +503,7 @@ func TestMempool_CheckTxChecksTxSize(t *testing.T) { tx := tmrand.Bytes(testCase.len) - err := mempl.CheckTx(tx, nil, TxInfo{}) + _, err := mempl.CheckTxSync(tx, TxInfo{}) bv := gogotypes.BytesValue{Value: tx} bz, err2 := bv.Marshal() require.NoError(t, err2) @@ -528,7 +529,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 2. len(tx) after CheckTx - err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x01}, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 1, mempool.TxsBytes()) @@ -539,7 +540,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 4. zero after Flush - err = mempool.CheckTx([]byte{0x02, 0x03}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x02, 0x03}, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 2, mempool.TxsBytes()) @@ -547,9 +548,9 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. - err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, TxInfo{}) require.NoError(t, err) - err = mempool.CheckTx([]byte{0x05}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x05}, TxInfo{}) if assert.Error(t, err) { assert.IsType(t, ErrMempoolIsFull{}, err) } @@ -563,7 +564,7 @@ func TestMempoolTxsBytes(t *testing.T) { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(0)) - err = mempool.CheckTx(txBytes, nil, TxInfo{}) + _, err = mempool.CheckTxSync(txBytes, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 8, mempool.TxsBytes()) @@ -589,7 +590,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 7. Test RemoveTxByKey function - err = mempool.CheckTx([]byte{0x06}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x06}, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 1, mempool.TxsBytes()) mempool.RemoveTxByKey(TxKey([]byte{0x07}), true) @@ -633,7 +634,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) { tx := txs[txNum] // this will err with ErrTxInCache many times ... - mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)}) //nolint: errcheck // will error + mempool.CheckTxSync(tx, TxInfo{SenderID: uint16(peerID)}) // nolint: errcheck } err := mempool.FlushAppConn() require.NoError(t, err) @@ -669,9 +670,10 @@ func newRemoteApp( } return clientCreator, server } + func checksumIt(data []byte) string { h := sha256.New() - h.Write(data) //nolint: errcheck // ignore errcheck + h.Write(data) // nolint: errcheck // ignore errcheck return fmt.Sprintf("%x", h.Sum(nil)) } diff --git a/mempool/mempool.go b/mempool/mempool.go index b749ce9d3..01ed2508e 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -15,7 +15,8 @@ import ( type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. - CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error) + CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go index 9b9f9efe3..1bed7c125 100644 --- a/mempool/mock/mempool.go +++ b/mempool/mock/mempool.go @@ -15,7 +15,10 @@ var _ mempl.Mempool = Mempool{} func (Mempool) Lock() {} func (Mempool) Unlock() {} func (Mempool) Size() int { return 0 } -func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { +func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { + return nil, nil +} +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } diff --git a/mempool/reactor.go b/mempool/reactor.go index 9120b15a9..b6c06cb98 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -183,7 +183,7 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { txInfo.SenderP2PID = src.ID() } for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) + err = memR.mempool.CheckTxAsync(tx, txInfo, nil) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index ebdbeca3b..866a17114 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -176,7 +176,7 @@ func TestReactor_MaxTxBytes(t *testing.T) { // Broadcast a tx, which has the max size // => ensure it's received by the second reactor. tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) - err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) + _, err := reactors[0].mempool.CheckTxSync(tx1, TxInfo{SenderID: UnknownPeerID}) require.NoError(t, err) waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) @@ -186,7 +186,7 @@ func TestReactor_MaxTxBytes(t *testing.T) { // Broadcast a tx, which is beyond the max size // => ensure it's not sent tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) - err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) + _, err = reactors[0].mempool.CheckTxSync(tx2, TxInfo{SenderID: UnknownPeerID}) require.Error(t, err) } diff --git a/node/node_test.go b/node/node_test.go index df729ff75..2e6967adc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -278,7 +278,7 @@ func TestCreateProposalBlock(t *testing.T) { txLength := 100 for i := 0; i <= maxBytes/txLength; i++ { tx := tmrand.Bytes(txLength) - err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) + _, err := mempool.CheckTxSync(tx, mempl.TxInfo{}) assert.NoError(t, err) } @@ -350,7 +350,7 @@ func TestMaxProposalBlockSize(t *testing.T) { // fill the mempool with one txs just below the maximum size txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1)) tx := tmrand.Bytes(txLength - 4) // to account for the varint - err = mempool.CheckTx(tx, nil, mempl.TxInfo{}) + _, err = mempool.CheckTxSync(tx, mempl.TxInfo{}) assert.NoError(t, err) blockExec := sm.NewBlockExecutor( diff --git a/rpc/client/mock/abci.go b/rpc/client/mock/abci.go index 52bd4bccd..dbeafbfc2 100644 --- a/rpc/client/mock/abci.go +++ b/rpc/client/mock/abci.go @@ -51,7 +51,7 @@ func (a ABCIApp) ABCIQueryWithOptions( // TODO: Make it wait for a commit and set res.Height appropriately. func (a ABCIApp) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { res := ctypes.ResultBroadcastTxCommit{} - res.CheckTx = a.App.CheckTx(abci.RequestCheckTx{Tx: tx}) + res.CheckTx = a.App.CheckTxSync(abci.RequestCheckTx{Tx: tx}) if res.CheckTx.IsErr() { return &res, nil } @@ -61,11 +61,13 @@ func (a ABCIApp) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.Re } func (a ABCIApp) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - c := a.App.CheckTx(abci.RequestCheckTx{Tx: tx}) + chRes := make(chan abci.ResponseCheckTx, 1) + a.App.CheckTxAsync(abci.RequestCheckTx{Tx: tx}, func(res abci.ResponseCheckTx) { + chRes <- res + }) + c := <-chRes // and this gets written in a background thread... - if !c.IsErr() { - go func() { a.App.DeliverTx(abci.RequestDeliverTx{Tx: tx}) }() - } + go func() { a.App.DeliverTx(abci.RequestDeliverTx{Tx: tx}) }() return &ctypes.ResultBroadcastTx{ Code: c.Code, Data: c.Data, @@ -76,7 +78,7 @@ func (a ABCIApp) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.Res } func (a ABCIApp) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - c := a.App.CheckTx(abci.RequestCheckTx{Tx: tx}) + c := a.App.CheckTxSync(abci.RequestCheckTx{Tx: tx}) // and this gets written in a background thread... if !c.IsErr() { go func() { a.App.DeliverTx(abci.RequestDeliverTx{Tx: tx}) }() diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index c39bcb0f2..8c021e674 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -351,7 +351,7 @@ func TestUnconfirmedTxs(t *testing.T) { ch := make(chan *abci.Response, 1) mempool := node.Mempool() - err := mempool.CheckTx(tx, func(resp *abci.Response) { ch <- resp }, mempl.TxInfo{}) + err := mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(resp *abci.Response) { ch <- resp }) require.NoError(t, err) // wait for tx to arrive in mempoool. @@ -381,7 +381,7 @@ func TestNumUnconfirmedTxs(t *testing.T) { ch := make(chan *abci.Response, 1) mempool := node.Mempool() - err := mempool.CheckTx(tx, func(resp *abci.Response) { ch <- resp }, mempl.TxInfo{}) + err := mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(resp *abci.Response) { ch <- resp }) require.NoError(t, err) // wait for tx to arrive in mempoool. diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 760caa0b7..393fe055d 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -20,7 +20,7 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) + err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) if err != nil { return nil, err @@ -32,14 +32,10 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - resCh := make(chan *abci.Response, 1) - err := env.Mempool.CheckTx(tx, func(res *abci.Response) { - resCh <- res - }, mempl.TxInfo{}) + res, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{}) if err != nil { return nil, err } - res := <-resCh r := res.GetCheckTx() return &ctypes.ResultBroadcastTx{ Code: r.Code, @@ -77,16 +73,12 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc } }() - // Broadcast tx and wait for CheckTx result - checkTxResCh := make(chan *abci.Response, 1) - err = env.Mempool.CheckTx(tx, func(res *abci.Response) { - checkTxResCh <- res - }, mempl.TxInfo{}) + // Broadcast tx and check tx + checkTxResMsg, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{}) if err != nil { env.Logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } - checkTxResMsg := <-checkTxResCh checkTxRes := checkTxResMsg.GetCheckTx() if checkTxRes.Code != abci.CodeTypeOK { return &ctypes.ResultBroadcastTxCommit{ diff --git a/state/helpers_test.go b/state/helpers_test.go index f2fe21f59..7878f1069 100644 --- a/state/helpers_test.go +++ b/state/helpers_test.go @@ -281,10 +281,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx return abci.ResponseDeliverTx{Events: []abci.Event{}} } -func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { return abci.ResponseCheckTx{} } +func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + callback(abci.ResponseCheckTx{}) +} + func (app *testApp) Commit() abci.ResponseCommit { return abci.ResponseCommit{RetainHeight: 1} } diff --git a/test/fuzz/mempool/checktx.go b/test/fuzz/mempool/checktx.go index 320a393ee..f6f0e25e3 100644 --- a/test/fuzz/mempool/checktx.go +++ b/test/fuzz/mempool/checktx.go @@ -25,7 +25,7 @@ func init() { } func Fuzz(data []byte) int { - err := mempool.CheckTx(data, nil, mempl.TxInfo{}) + _, err := mempool.CheckTxSync(data, mempl.TxInfo{}) if err != nil { return 0 } diff --git a/test/maverick/consensus/replay_stubs.go b/test/maverick/consensus/replay_stubs.go index adab0edf8..cdeaf701f 100644 --- a/test/maverick/consensus/replay_stubs.go +++ b/test/maverick/consensus/replay_stubs.go @@ -18,7 +18,10 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} func (emptyMempool) Size() int { return 0 } -func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { +func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { + return nil, nil +} +func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { return nil } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }