Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove mempool.postCheck (#158) #217

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (emptyMempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestCacheAfterUpdate(t *testing.T) {
updateTxs = append(updateTxs, tx)
}
err := mempool.Update(newTestBlock(int64(tcIndex), updateTxs),
abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
abciResponses(len(updateTxs), abci.CodeTypeOK), nil)
require.NoError(t, err)

for _, v := range tc.reAddIndices {
Expand Down
83 changes: 49 additions & 34 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type CListMempool struct {
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes

reserved int // the number of checking tx and it should be considered when checking mempool full
reservedBytes int64 // size of checking tx and it should be considered when checking mempool full
reservedMtx sync.Mutex

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
Expand All @@ -50,7 +54,6 @@ type CListMempool struct {
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx tmsync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
Expand Down Expand Up @@ -126,13 +129,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.preCheck = f }
}

// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx. Only applies to the first created block.
// After that, Update overwrites the existing value.
func WithPostCheck(f PostCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.postCheck = f }
}

// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
Expand Down Expand Up @@ -282,6 +278,14 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
return ErrTxInCache
}

// reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()`
if err := mem.reserve(int64(txSize)); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))

Expand Down Expand Up @@ -391,6 +395,35 @@ func (mem *CListMempool) isFull(txSize int) error {
return nil
}

func (mem *CListMempool) reserve(txSize int64) error {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)

if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize + mem.reserved, mem.config.Size,
txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes,
}
}

mem.reserved++
mem.reservedBytes += txSize
return nil
}

func (mem *CListMempool) releaseReserve(txSize int64) {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

mem.reserved--
mem.reservedBytes -= txSize
}

// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
Expand All @@ -403,20 +436,7 @@ func (mem *CListMempool) resCbFirstTime(
) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}

if r.CheckTx.Code == abci.CodeTypeOK {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
Expand All @@ -434,13 +454,16 @@ func (mem *CListMempool) resCbFirstTime(
} else {
// ignore bad transaction
mem.logger.Debug("rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
"tx", txID(tx), "peerID", peerP2PID, "res", r)
mem.metrics.FailedTxs.Add(1)
if !mem.config.KeepInvalidTxsInCache {
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}
}

// release `reserve` regardless it's OK or not (it might be good later)
mem.releaseReserve(int64(len(tx)))
default:
// ignore other messages
}
Expand All @@ -461,15 +484,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
memTx.tx,
tx))
}
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
if r.CheckTx.Code == abci.CodeTypeOK {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache)
}
Expand Down Expand Up @@ -562,12 +581,11 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
return txs
}

// Lock() must be help by the caller during execution.
// Lock() must be held by the caller during execution.
func (mem *CListMempool) Update(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// Set height
mem.height = block.Height
Expand All @@ -576,9 +594,6 @@ func (mem *CListMempool) Update(
if preCheck != nil {
mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
}

for i, tx := range block.Txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
Expand Down
38 changes: 15 additions & 23 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,31 +147,23 @@ func TestMempoolFilters(t *testing.T) {
emptyTxArr := []types.Tx{[]byte{}}

nopPreFilter := func(tx types.Tx) error { return nil }
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }

// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes
tests := []struct {
numTxsToCreate int
preFilter PreCheckFunc
postFilter PostCheckFunc
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, PreCheckMaxBytes(10), nopPostFilter, 0},
{10, PreCheckMaxBytes(22), nopPostFilter, 10},
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
{10, nopPreFilter, PostCheckMaxGas(0), 0},
{10, nopPreFilter, PostCheckMaxGas(1), 10},
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
{10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10},
{10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0},
{10, nopPreFilter, 10},
{10, PreCheckMaxBytes(10), 0},
{10, PreCheckMaxBytes(20), 0},
{10, PreCheckMaxBytes(22), 10},
{10, PreCheckMaxBytes(30), 10},
}
for tcIndex, tt := range tests {
err := mempool.Update(newTestBlock(1, emptyTxArr),
abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter)
require.NoError(t, err)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
Expand All @@ -188,7 +180,7 @@ func TestMempoolUpdate(t *testing.T) {
// 1. Adds valid txs to the cache
{
err := mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}),
abciResponses(1, abci.CodeTypeOK), nil, nil)
abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
Expand All @@ -200,7 +192,7 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil, nil)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.Zero(t, mempool.Size())
}
Expand All @@ -209,7 +201,7 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil, nil)
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil)
require.NoError(t, err)
assert.Zero(t, mempool.Size())

Expand Down Expand Up @@ -241,7 +233,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
_ = app.DeliverTx(abci.RequestDeliverTx{Tx: a})
_ = app.DeliverTx(abci.RequestDeliverTx{Tx: b})
err = mempool.Update(newTestBlock(1, []types.Tx{a, b}),
[]*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil)
[]*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil)
require.NoError(t, err)

// a must be added to the cache
Expand Down Expand Up @@ -297,7 +289,7 @@ func TestTxsAvailable(t *testing.T) {
// since there are still txs left
committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(newTestBlock(1, committedTxs),
abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
Expand All @@ -310,7 +302,7 @@ func TestTxsAvailable(t *testing.T) {
// now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) //nolint: gocritic
if err := mempool.Update(newTestBlock(2, committedTxs),
abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
Expand Down Expand Up @@ -370,7 +362,7 @@ func TestSerialReap(t *testing.T) {
txs = append(txs, txBytes)
}
if err := mempool.Update(newTestBlock(0, txs),
abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -542,7 +534,7 @@ func TestMempoolTxsBytes(t *testing.T) {

// 3. zero again after tx is removed by Update
err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}),
abciResponses(1, abci.CodeTypeOK), nil, nil)
abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.EqualValues(t, 0, mempool.TxsBytes())

Expand Down Expand Up @@ -592,7 +584,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NotEmpty(t, res2.Data)

// Pretend like we committed nothing so txBytes gets rechecked and removed.
err = mempool.Update(newTestBlock(1, []types.Tx{}), abciResponses(0, abci.CodeTypeOK), nil, nil)
err = mempool.Update(newTestBlock(1, []types.Tx{}), abciResponses(0, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.EqualValues(t, 0, mempool.TxsBytes())

Expand Down
25 changes: 0 additions & 25 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Mempool interface {
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error

// Flush removes all transactions from the mempool and cache
Expand Down Expand Up @@ -79,11 +78,6 @@ type Mempool interface {
// transaction doesn't exceeded the block size.
type PreCheckFunc func(types.Tx) error

// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
Expand All @@ -108,22 +102,3 @@ func PreCheckMaxBytes(maxBytes int64) PreCheckFunc {
return nil
}
}

// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
// maxGas. Returns nil if maxGas is -1.
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return func(tx types.Tx, res *abci.ResponseCheckTx) error {
if maxGas == -1 {
return nil
}
if res.GasWanted < 0 {
return fmt.Errorf("gas wanted %d is negative",
res.GasWanted)
}
if res.GasWanted > maxGas {
return fmt.Errorf("gas wanted %d is greater than max gas %d",
res.GasWanted, maxGas)
}
return nil
}
}
1 change: 0 additions & 1 deletion mempool/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func (Mempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestReactorConcurrency(t *testing.T) {
for i := range txs {
deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0}
}
err := reactors[0].mempool.Update(newTestBlock(1, txs), deliverTxResponses, nil, nil)
err := reactors[0].mempool.Update(newTestBlock(1, txs), deliverTxResponses, nil)
assert.NoError(t, err)
}()

Expand All @@ -115,7 +115,7 @@ func TestReactorConcurrency(t *testing.T) {
reactors[1].mempool.Lock()
defer reactors[1].mempool.Unlock()
err := reactors[1].mempool.Update(newTestBlock(1, []types.Tx{}),
make([]*abci.ResponseDeliverTx, 0), nil, nil)
make([]*abci.ResponseDeliverTx, 0), nil)
assert.NoError(t, err)
}()

Expand Down
1 change: 0 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
Expand Down
2 changes: 0 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func TestCreateProposalBlock(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down Expand Up @@ -341,7 +340,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down
Loading