Skip to content

Commit

Permalink
chore: remove mempool.postCheck (#158) (#181)
Browse files Browse the repository at this point in the history
* fix: error handling after check tx

* fix: typo

* chore: (mempool) remove postCheck and impl reserve

* chore: fix tests

* chore: revise log (remove checkTx.Code)

* chore: add `CONTRACT` for `mem.proxyAppConn.CheckTxAsync()`

* chore: revise numTxs, txsBytes for `ErrMempoolIsFull` in reserve()

* chore: revise to remove redundant `isFull()`

* fix: remove tx from cache when `app errors` or `failed to reserve`

* Revert "chore: revise to remove redundant `isFull()`"

This reverts commit 55990ec.

Co-authored-by: KIm, JinSan <jinsan@linecorp.com>
  • Loading branch information
wetcod and jinsan-line authored Feb 4, 2021
1 parent 629ddc8 commit f3e66ad
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 98 deletions.
2 changes: 1 addition & 1 deletion mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestCacheAfterUpdate(t *testing.T) {
tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx)
}
mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil)

for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
Expand Down
84 changes: 51 additions & 33 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type CListMempool struct {
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes

reserved int // the number of checking tx and it should be considered when checking mempool full
reservedBytes int64 // size of checking tx and it should be considered when checking mempool full
reservedMtx sync.Mutex

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
Expand All @@ -43,7 +47,6 @@ type CListMempool struct {
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx sync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
Expand Down Expand Up @@ -118,12 +121,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.preCheck = f }
}

// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx.
func WithPostCheck(f PostCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.postCheck = f }
}

// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
Expand Down Expand Up @@ -283,9 +280,19 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx

// NOTE: proxyAppConn may error if tx buffer is full
if err := mem.proxyAppConn.Error(); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()`
if err := mem.reserve(int64(txSize)); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))

Expand Down Expand Up @@ -385,6 +392,35 @@ func (mem *CListMempool) isFull(txSize int) error {
return nil
}

func (mem *CListMempool) reserve(txSize int64) error {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)

if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize + mem.reserved, mem.config.Size,
txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes,
}
}

mem.reserved++
mem.reservedBytes += txSize
return nil
}

func (mem *CListMempool) releaseReserve(txSize int64) {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

mem.reserved--
mem.reservedBytes -= txSize
}

// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
Expand All @@ -397,20 +433,7 @@ func (mem *CListMempool) resCbFirstTime(
) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}

if r.CheckTx.Code == abci.CodeTypeOK {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
Expand All @@ -428,11 +451,14 @@ func (mem *CListMempool) resCbFirstTime(
} else {
// ignore bad transaction
mem.logger.Info("Rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
"tx", txID(tx), "peerID", peerP2PID, "res", r)
mem.metrics.FailedTxs.Add(1)
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}

// release `reserve` regardless it's OK or not (it might be good later)
mem.releaseReserve(int64(len(tx)))
default:
// ignore other messages
}
Expand All @@ -453,15 +479,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
memTx.tx,
tx))
}
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
if r.CheckTx.Code == abci.CodeTypeOK {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, true)
}
Expand Down Expand Up @@ -555,13 +577,12 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
return txs
}

// Lock() must be help by the caller during execution.
// Lock() must be held by the caller during execution.
func (mem *CListMempool) Update(
height int64,
txs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// Set height
mem.height = height
Expand All @@ -570,9 +591,6 @@ func (mem *CListMempool) Update(
if preCheck != nil {
mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
}

for i, tx := range txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
Expand Down
37 changes: 14 additions & 23 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,31 +147,22 @@ func TestMempoolFilters(t *testing.T) {
emptyTxArr := []types.Tx{[]byte{}}

nopPreFilter := func(tx types.Tx) error { return nil }
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }

// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
tests := []struct {
numTxsToCreate int
preFilter PreCheckFunc
postFilter PostCheckFunc
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, PreCheckAminoMaxBytes(10), nopPostFilter, 0},
{10, PreCheckAminoMaxBytes(20), nopPostFilter, 0},
{10, PreCheckAminoMaxBytes(22), nopPostFilter, 10},
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
{10, nopPreFilter, PostCheckMaxGas(0), 0},
{10, nopPreFilter, PostCheckMaxGas(1), 10},
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
{10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10},
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
{10, nopPreFilter, 10},
{10, PreCheckAminoMaxBytes(10), 0},
{10, PreCheckAminoMaxBytes(20), 0},
{10, PreCheckAminoMaxBytes(22), 10},
{10, PreCheckAminoMaxBytes(30), 10},
}
for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
mempool.Flush()
Expand All @@ -186,7 +177,7 @@ func TestMempoolUpdate(t *testing.T) {

// 1. Adds valid txs to the cache
{
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil)
err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
Expand All @@ -197,15 +188,15 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil)
assert.Zero(t, mempool.Size())
}

// 3. Removes invalid transactions from the cache and the mempool (if present)
{
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil)
assert.Zero(t, mempool.Size())

err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
Expand Down Expand Up @@ -234,7 +225,7 @@ func TestTxsAvailable(t *testing.T) {
// it should fire once now for the new height
// since there are still txs left
committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
Expand All @@ -246,7 +237,7 @@ func TestTxsAvailable(t *testing.T) {

// now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) //nolint: gocritic
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
Expand Down Expand Up @@ -305,7 +296,7 @@ func TestSerialReap(t *testing.T) {
binary.BigEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes)
}
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -489,7 +480,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 1, mempool.TxsBytes())

// 3. zero again after tx is removed by Update
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil)
assert.EqualValues(t, 0, mempool.TxsBytes())

// 4. zero after Flush
Expand Down Expand Up @@ -534,7 +525,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NotEmpty(t, res2.Data)

// Pretend like we committed nothing so txBytes gets rechecked and removed.
mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil)
assert.EqualValues(t, 0, mempool.TxsBytes())
}

Expand Down
25 changes: 0 additions & 25 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Mempool interface {
blockTxs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error

// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
Expand Down Expand Up @@ -85,11 +84,6 @@ type Mempool interface {
// transaction doesn't exceeded the block size.
type PreCheckFunc func(types.Tx) error

// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
Expand Down Expand Up @@ -120,22 +114,3 @@ func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc {
return nil
}
}

// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
// maxGas. Returns nil if maxGas is -1.
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return func(tx types.Tx, res *abci.ResponseCheckTx) error {
if maxGas == -1 {
return nil
}
if res.GasWanted < 0 {
return fmt.Errorf("gas wanted %d is negative",
res.GasWanted)
}
if res.GasWanted > maxGas {
return fmt.Errorf("gas wanted %d is greater than max gas %d",
res.GasWanted, maxGas)
}
return nil
}
}
1 change: 0 additions & 1 deletion mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func (Mempool) Update(
_ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
Expand Down
1 change: 0 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func TestCreateProposalBlock(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down
8 changes: 1 addition & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,7 @@ func (blockExec *BlockExecutor) Commit(

// Update mempool.
updateMempoolStartTime := time.Now().UnixNano()
err = blockExec.mempool.Update(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
)
err = blockExec.mempool.Update(block.Height, block.Txs, deliverTxResponses, TxPreCheck(state))
updateMempoolEndTime := time.Now().UnixNano()

updateMempoolTimeMs := float64(updateMempoolEndTime-updateMempoolStartTime) / 1000000
Expand Down
6 changes: 0 additions & 6 deletions state/tx_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,3 @@ func TxPreCheck(state State) mempl.PreCheckFunc {
)
return mempl.PreCheckAminoMaxBytes(maxDataBytes)
}

// TxPostCheck returns a function to filter transactions after processing.
// The function limits the gas wanted by a transaction to the block's maximum total gas.
func TxPostCheck(state State) mempl.PostCheckFunc {
return mempl.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)
}

0 comments on commit f3e66ad

Please sign in to comment.