Skip to content

Commit

Permalink
add tests for handler reannounce local pending transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Keefe-Liu <bianze.kernel@gmail.com>
  • Loading branch information
keefel committed Nov 22, 2021
1 parent 5a1f833 commit 3c4cb77
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
53 changes: 53 additions & 0 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
}
}

// Tests that local pending transactions get propagated to peers.
func TestTransactionPendingReannounce(t *testing.T) {
t.Parallel()

// Create a source handler to announce transactions from and a sink handler
// to receive them.
source := newTestHandler()
defer source.close()

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(sink.handler), peer)
})

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
for nonce := range txs {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)

txs[nonce] = tx
}
source.txpool.ReannouceTransactions(txs)

for arrived := 0; arrived < len(txs); {
select {
case event := <-txCh:
arrived += len(event.Txs)
case <-time.NewTimer(time.Second).C:
t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs))
}
}
}

// Tests that post eth protocol handshake, clients perform a mutual checkpoint
// challenge to validate each other's chains. Hash mismatches, or missing ones
// during a fast sync should lead to the peer getting dropped.
Expand Down
23 changes: 21 additions & 2 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ var (
type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions

txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool
txFeed event.Feed // Notification feed to allow waiting for inclusion
reannoTxFeed event.Feed // Notification feed to trigger reannouce
lock sync.RWMutex // Protects the transaction pool
}

// newTestTxPool creates a mock transaction pool.
Expand Down Expand Up @@ -90,6 +91,18 @@ func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
return make([]error, len(txs))
}

// ReannouceTransactions announce the transactions to some peers.
func (p *testTxPool) ReannouceTransactions(txs []*types.Transaction) []error {
p.lock.Lock()
defer p.lock.Unlock()

for _, tx := range txs {
p.pool[tx.Hash()] = tx
}
p.reannoTxFeed.Send(core.ReannoTxsEvent{Txs: txs})
return make([]error, len(txs))
}

// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
p.lock.RLock()
Expand All @@ -112,6 +125,12 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
return p.txFeed.Subscribe(ch)
}

// SubscribeReannoTxsEvent should return an event subscription of ReannoTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription {
return p.reannoTxFeed.Subscribe(ch)
}

// testHandler is a live implementation of the Ethereum protocol handler, just
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.
Expand Down

0 comments on commit 3c4cb77

Please sign in to comment.