From 706d0de731db413ca7aac728482dba45c6bd7bb9 Mon Sep 17 00:00:00 2001 From: "KIm, JinSan" Date: Wed, 6 Jan 2021 15:17:49 +0900 Subject: [PATCH] chore: remove mempool.postCheck (#158) * fix: error handling after check tx * fix: typo * chore: (mempool) remove postCheck and impl reserve * chore: fix tests * chore: revise log (remove checkTx.Code) * chore: add `CONTRACT` for `mem.proxyAppConn.CheckTxAsync()` * chore: revise numTxs, txsBytes for `ErrMempoolIsFull` in reserve() * chore: revise to remove redundant `isFull()` * fix: remove tx from cache when `app errors` or `failed to reserve` * Revert "chore: revise to remove redundant `isFull()`" This reverts commit 55990ec6d2557307497d14e302ee92b8f2081d83. --- mempool/cache_test.go | 2 +- mempool/clist_mempool.go | 84 +++++++++++++++++++++-------------- mempool/clist_mempool_test.go | 37 ++++++--------- mempool/mempool.go | 25 ----------- mock/mempool.go | 1 - node/node.go | 1 - node/node_test.go | 1 - state/execution.go | 8 +--- state/tx_filter.go | 6 --- 9 files changed, 67 insertions(+), 98 deletions(-) diff --git a/mempool/cache_test.go b/mempool/cache_test.go index 99bbba406..bb4642252 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -67,7 +67,7 @@ func TestCacheAfterUpdate(t *testing.T) { tx := types.Tx{byte(v)} updateTxs = append(updateTxs, tx) } - mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) + mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil) for _, v := range tc.reAddIndices { tx := types.Tx{byte(v)} diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 55e3a2327..ab3d56066 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -33,6 +33,10 @@ type CListMempool struct { height int64 // the last block Update()'d to txsBytes int64 // total size of mempool, in bytes + reserved int // the number of checking tx and it should be considered when checking mempool full + reservedBytes int64 // size of checking tx and it should be considered when checking mempool full + reservedMtx sync.Mutex + // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool txsAvailable chan struct{} // fires once for each height, when the mempool is not empty @@ -43,7 +47,6 @@ type CListMempool struct { // CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods. updateMtx sync.RWMutex preCheck PreCheckFunc - postCheck PostCheckFunc wal *auto.AutoFile // a log of mempool txs txs *clist.CList // concurrent linked-list of good txs @@ -118,12 +121,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption { return func(mem *CListMempool) { mem.preCheck = f } } -// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran after CheckTx. -func WithPostCheck(f PostCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.postCheck = f } -} - // WithMetrics sets the metrics. func WithMetrics(metrics *Metrics) CListMempoolOption { return func(mem *CListMempool) { mem.metrics = metrics } @@ -283,9 +280,19 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx // NOTE: proxyAppConn may error if tx buffer is full if err := mem.proxyAppConn.Error(); err != nil { + // remove from cache + mem.cache.Remove(tx) + return err + } + + // reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()` + if err := mem.reserve(int64(txSize)); err != nil { + // remove from cache + mem.cache.Remove(tx) return err } + // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) @@ -385,6 +392,35 @@ func (mem *CListMempool) isFull(txSize int) error { return nil } +func (mem *CListMempool) reserve(txSize int64) error { + mem.reservedMtx.Lock() + defer mem.reservedMtx.Unlock() + + var ( + memSize = mem.Size() + txsBytes = mem.TxsBytes() + ) + + if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes { + return ErrMempoolIsFull{ + memSize + mem.reserved, mem.config.Size, + txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes, + } + } + + mem.reserved++ + mem.reservedBytes += txSize + return nil +} + +func (mem *CListMempool) releaseReserve(txSize int64) { + mem.reservedMtx.Lock() + defer mem.reservedMtx.Unlock() + + mem.reserved-- + mem.reservedBytes -= txSize +} + // callback, which is called after the app checked the tx for the first time. // // The case where the app checks the tx for the second and subsequent times is @@ -397,20 +433,7 @@ func (mem *CListMempool) resCbFirstTime( ) { switch r := res.Value.(type) { case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Check mempool isn't full again to reduce the chance of exceeding the - // limits. - if err := mem.isFull(len(tx)); err != nil { - // remove from cache (mempool might have a space later) - mem.cache.Remove(tx) - mem.logger.Error(err.Error()) - return - } - + if r.CheckTx.Code == abci.CodeTypeOK { memTx := &mempoolTx{ height: mem.height, gasWanted: r.CheckTx.GasWanted, @@ -428,11 +451,14 @@ func (mem *CListMempool) resCbFirstTime( } else { // ignore bad transaction mem.logger.Info("Rejected bad transaction", - "tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr) + "tx", txID(tx), "peerID", peerP2PID, "res", r) mem.metrics.FailedTxs.Add(1) // remove from cache (it might be good later) mem.cache.Remove(tx) } + + // release `reserve` regardless it's OK or not (it might be good later) + mem.releaseReserve(int64(len(tx))) default: // ignore other messages } @@ -453,15 +479,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { memTx.tx, tx)) } - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { + if r.CheckTx.Code == abci.CodeTypeOK { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. - mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr) + mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r) // NOTE: we remove tx from the cache because it might be good later mem.removeTx(tx, mem.recheckCursor, true) } @@ -555,13 +577,12 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { return txs } -// Lock() must be help by the caller during execution. +// Lock() must be held by the caller during execution. func (mem *CListMempool) Update( height int64, txs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, - postCheck PostCheckFunc, ) error { // Set height mem.height = height @@ -570,9 +591,6 @@ func (mem *CListMempool) Update( if preCheck != nil { mem.preCheck = preCheck } - if postCheck != nil { - mem.postCheck = postCheck - } for i, tx := range txs { if deliverTxResponses[i].Code == abci.CodeTypeOK { diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 17ab83f33..2dbe3aa2e 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -147,31 +147,22 @@ func TestMempoolFilters(t *testing.T) { emptyTxArr := []types.Tx{[]byte{}} nopPreFilter := func(tx types.Tx) error { return nil } - nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas tests := []struct { numTxsToCreate int preFilter PreCheckFunc - postFilter PostCheckFunc expectedNumTxs int }{ - {10, nopPreFilter, nopPostFilter, 10}, - {10, PreCheckAminoMaxBytes(10), nopPostFilter, 0}, - {10, PreCheckAminoMaxBytes(20), nopPostFilter, 0}, - {10, PreCheckAminoMaxBytes(22), nopPostFilter, 10}, - {10, nopPreFilter, PostCheckMaxGas(-1), 10}, - {10, nopPreFilter, PostCheckMaxGas(0), 0}, - {10, nopPreFilter, PostCheckMaxGas(1), 10}, - {10, nopPreFilter, PostCheckMaxGas(3000), 10}, - {10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0}, - {10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10}, - {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10}, - {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0}, + {10, nopPreFilter, 10}, + {10, PreCheckAminoMaxBytes(10), 0}, + {10, PreCheckAminoMaxBytes(20), 0}, + {10, PreCheckAminoMaxBytes(22), 10}, + {10, PreCheckAminoMaxBytes(30), 10}, } for tcIndex, tt := range tests { - mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) + mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter) checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) mempool.Flush() @@ -186,7 +177,7 @@ func TestMempoolUpdate(t *testing.T) { // 1. Adds valid txs to the cache { - mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil) err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) @@ -197,7 +188,7 @@ func TestMempoolUpdate(t *testing.T) { { err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) require.NoError(t, err) - mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil) assert.Zero(t, mempool.Size()) } @@ -205,7 +196,7 @@ func TestMempoolUpdate(t *testing.T) { { err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) require.NoError(t, err) - mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) + mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil) assert.Zero(t, mempool.Size()) err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) @@ -234,7 +225,7 @@ func TestTxsAvailable(t *testing.T) { // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] - if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { + if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) @@ -246,7 +237,7 @@ func TestTxsAvailable(t *testing.T) { // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) //nolint: gocritic - if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { + if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -305,7 +296,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { + if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } } @@ -489,7 +480,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 1, mempool.TxsBytes()) // 3. zero again after tx is removed by Update - mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil) assert.EqualValues(t, 0, mempool.TxsBytes()) // 4. zero after Flush @@ -534,7 +525,7 @@ func TestMempoolTxsBytes(t *testing.T) { require.NotEmpty(t, res2.Data) // Pretend like we committed nothing so txBytes gets rechecked and removed. - mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) + mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil) assert.EqualValues(t, 0, mempool.TxsBytes()) } diff --git a/mempool/mempool.go b/mempool/mempool.go index 68eec8674..fe52b1563 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -43,7 +43,6 @@ type Mempool interface { blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, - newPostFn PostCheckFunc, ) error // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are @@ -85,11 +84,6 @@ type Mempool interface { // transaction doesn't exceeded the block size. type PreCheckFunc func(types.Tx) error -// PostCheckFunc is an optional filter executed after CheckTx and rejects -// transaction if false is returned. An example would be to ensure a -// transaction doesn't require more gas than available for the block. -type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error - // TxInfo are parameters that get passed when attempting to add a tx to the // mempool. type TxInfo struct { @@ -120,22 +114,3 @@ func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc { return nil } } - -// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed -// maxGas. Returns nil if maxGas is -1. -func PostCheckMaxGas(maxGas int64) PostCheckFunc { - return func(tx types.Tx, res *abci.ResponseCheckTx) error { - if maxGas == -1 { - return nil - } - if res.GasWanted < 0 { - return fmt.Errorf("gas wanted %d is negative", - res.GasWanted) - } - if res.GasWanted > maxGas { - return fmt.Errorf("gas wanted %d is greater than max gas %d", - res.GasWanted, maxGas) - } - return nil - } -} diff --git a/mock/mempool.go b/mock/mempool.go index be690efaa..9f1b8b1f8 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -25,7 +25,6 @@ func (Mempool) Update( _ types.Txs, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/node/node.go b/node/node.go index 1ad026984..a1b768712 100644 --- a/node/node.go +++ b/node/node.go @@ -309,7 +309,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempoolLogger := logger.With("module", "mempool") mempoolReactor := mempl.NewReactor(config.Mempool, mempool) diff --git a/node/node_test.go b/node/node_test.go index cd52ad06e..3a20893fb 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -242,7 +242,6 @@ func TestCreateProposalBlock(t *testing.T) { state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempool.SetLogger(logger) diff --git a/state/execution.go b/state/execution.go index 50a7afd89..dde88bad4 100644 --- a/state/execution.go +++ b/state/execution.go @@ -248,13 +248,7 @@ func (blockExec *BlockExecutor) Commit( // Update mempool. updateMempoolStartTime := time.Now().UnixNano() - err = blockExec.mempool.Update( - block.Height, - block.Txs, - deliverTxResponses, - TxPreCheck(state), - TxPostCheck(state), - ) + err = blockExec.mempool.Update(block.Height, block.Txs, deliverTxResponses, TxPreCheck(state)) updateMempoolEndTime := time.Now().UnixNano() updateMempoolTimeMs := float64(updateMempoolEndTime-updateMempoolStartTime) / 1000000 diff --git a/state/tx_filter.go b/state/tx_filter.go index a8c0627dc..0b18ba8b3 100644 --- a/state/tx_filter.go +++ b/state/tx_filter.go @@ -14,9 +14,3 @@ func TxPreCheck(state State) mempl.PreCheckFunc { ) return mempl.PreCheckAminoMaxBytes(maxDataBytes) } - -// TxPostCheck returns a function to filter transactions after processing. -// The function limits the gas wanted by a transaction to the block's maximum total gas. -func TxPostCheck(state State) mempl.PostCheckFunc { - return mempl.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas) -}