From d201e6049987cd2c3c83cf3d315d969f0f2a7654 Mon Sep 17 00:00:00 2001 From: "larry.lx" Date: Wed, 6 Sep 2023 14:48:31 +0800 Subject: [PATCH 1/3] fetcher: no import blocks before or equal to the finalized height --- eth/fetcher/block_fetcher.go | 92 +++++++++++++++++++------------ eth/fetcher/block_fetcher_test.go | 77 +++++++++++++++++++++++++- eth/handler.go | 6 +- 3 files changed, 138 insertions(+), 37 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 7953e5e695..f537a775c8 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -94,6 +94,9 @@ type blockBroadcasterFn func(block *types.Block, propagate bool) // chainHeightFn is a callback type to retrieve the current chain height. type chainHeightFn func() uint64 +// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height. +type chainFinalizedHeightFn func() uint64 + // headersInsertFn is a callback type to insert a batch of headers into the local chain. type headersInsertFn func(headers []*types.Header) (int, error) @@ -189,14 +192,15 @@ type BlockFetcher struct { queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) // Callbacks - getHeader HeaderRetrievalFn // Retrieves a header from the local chain - getBlock blockRetrievalFn // Retrieves a block from the local chain - verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work - broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers - chainHeight chainHeightFn // Retrieves the current chain's height - insertHeaders headersInsertFn // Injects a batch of headers into the chain - insertChain chainInsertFn // Injects a batch of blocks into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + getHeader HeaderRetrievalFn // Retrieves a header from the local chain + getBlock blockRetrievalFn // Retrieves a block from the local chain + verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work + broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers + chainHeight chainHeightFn // Retrieves the current chain's height + chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height + insertHeaders headersInsertFn // Injects a batch of headers into the chain + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list @@ -207,32 +211,35 @@ type BlockFetcher struct { } // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements. -func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { +func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, + broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn, + insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { return &BlockFetcher{ - light: light, - notify: make(chan *blockAnnounce), - inject: make(chan *blockOrHeaderInject), - headerFilter: make(chan chan *headerFilterTask), - bodyFilter: make(chan chan *bodyFilterTask), - done: make(chan common.Hash), - quit: make(chan struct{}), - requeue: make(chan *blockOrHeaderInject), - announces: make(map[string]int), - announced: make(map[common.Hash][]*blockAnnounce), - fetching: make(map[common.Hash]*blockAnnounce), - fetched: make(map[common.Hash][]*blockAnnounce), - completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New(nil), - queues: make(map[string]int), - queued: make(map[common.Hash]*blockOrHeaderInject), - getHeader: getHeader, - getBlock: getBlock, - verifyHeader: verifyHeader, - broadcastBlock: broadcastBlock, - chainHeight: chainHeight, - insertHeaders: insertHeaders, - insertChain: insertChain, - dropPeer: dropPeer, + light: light, + notify: make(chan *blockAnnounce), + inject: make(chan *blockOrHeaderInject), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + requeue: make(chan *blockOrHeaderInject), + announces: make(map[string]int), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), + queue: prque.New(nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*blockOrHeaderInject), + getHeader: getHeader, + getBlock: getBlock, + verifyHeader: verifyHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + chainFinalizedHeight: chainFinalizedHeight, + insertHeaders: insertHeaders, + insertChain: insertChain, + dropPeer: dropPeer, } } @@ -361,6 +368,7 @@ func (f *BlockFetcher) loop() { } // Import any queued blocks that could potentially fit height := f.chainHeight() + finalizedHeight := f.chainFinalizedHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*blockOrHeaderInject) hash := op.hash() @@ -377,7 +385,7 @@ func (f *BlockFetcher) loop() { break } // Otherwise if fresh and still unknown, try and import - if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { + if (number+maxUncleDist < height) || number <= finalizedHeight || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { f.forgetBlock(hash) continue } @@ -408,7 +416,13 @@ func (f *BlockFetcher) loop() { } // If we have a valid block number, check that it's potentially useful if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { - log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) + log.Debug("Peer discarded announcement by distance", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) + blockAnnounceDropMeter.Mark(1) + break + } + finalized := f.chainFinalizedHeight() + if notification.number <= finalized { + log.Debug("Peer discarded announcement by finality", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "finalized", finalized) blockAnnounceDropMeter.Mark(1) break } @@ -814,6 +828,14 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B f.forgetHash(hash) return } + // Discard any block that is below the current finalized height + finalizedHeight := f.chainFinalizedHeight() + if number <= finalizedHeight { + log.Debug("Discarded delivered header or block, below or equal to finalized", "peer", peer, "number", number, "hash", hash, "finalized", finalizedHeight) + blockBroadcastDropMeter.Mark(1) + f.forgetHash(hash) + return + } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { op := &blockOrHeaderInject{origin: peer} diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index a7e1a2ffb1..23e59a9079 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -96,7 +96,9 @@ func newTester(light bool) *fetcherTester { blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, drops: make(map[string]bool), } - tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer) + tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, + tester.broadcastBlock, tester.chainHeight, tester.chainFinalizedHeight, tester.insertHeaders, + tester.insertChain, tester.dropPeer) tester.fetcher.Start() return tester @@ -138,6 +140,18 @@ func (f *fetcherTester) chainHeight() uint64 { return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() } +func (f *fetcherTester) chainFinalizedHeight() uint64 { + f.lock.RLock() + defer f.lock.RUnlock() + if len(f.hashes) < 3 { + return 0 + } + if f.fetcher.light { + return f.headers[f.hashes[len(f.hashes)-3]].Number.Uint64() + } + return f.blocks[f.hashes[len(f.hashes)-3]].NumberU64() +} + // insertChain injects a new headers into the simulated chain. func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) { f.lock.Lock() @@ -730,6 +744,67 @@ func testDistantAnnouncementDiscarding(t *testing.T, light bool) { } } +// Tests that announcements with numbers much lower or equal to the current finalized block +// head get discarded to prevent wasting resources on useless blocks from faulty peers. +func TestFullFinalizedAnnouncementDiscarding(t *testing.T) { + testFinalizedAnnouncementDiscarding(t, false) +} +func TestLightFinalizedAnnouncementDiscarding(t *testing.T) { + testFinalizedAnnouncementDiscarding(t, true) +} + +func testFinalizedAnnouncementDiscarding(t *testing.T, light bool) { + // Create a long chain to import and define the discard boundaries + hashes, blocks := makeChain(3*maxQueueDist, 0, genesis) + + head := hashes[len(hashes)/2] + justified := hashes[len(hashes)/2+1] + finalized := hashes[len(hashes)/2+2] + beforeFinalized := hashes[len(hashes)/2+3] + + low, equal := len(hashes)/2+3, len(hashes)/2+2 + + // Create a tester and simulate a head block being the middle of the above chain + tester := newTester(light) + + tester.lock.Lock() + tester.hashes = []common.Hash{beforeFinalized, finalized, justified, head} + tester.headers = map[common.Hash]*types.Header{ + beforeFinalized: blocks[beforeFinalized].Header(), + finalized: blocks[finalized].Header(), + justified: blocks[justified].Header(), + head: blocks[head].Header(), + } + tester.blocks = map[common.Hash]*types.Block{ + beforeFinalized: blocks[beforeFinalized], + finalized: blocks[finalized], + justified: blocks[justified], + head: blocks[head], + } + tester.lock.Unlock() + + headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0) + + fetching := make(chan struct{}, 2) + tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} } + + // Ensure that a block with a lower number than the threshold is discarded + tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil) + select { + case <-time.After(50 * time.Millisecond): + case <-fetching: + t.Fatalf("fetcher requested stale header") + } + // Ensure that a block with a higher number than the threshold is discarded + tester.fetcher.Notify("higher", hashes[equal], blocks[hashes[equal]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil) + select { + case <-time.After(50 * time.Millisecond): + case <-fetching: + t.Fatalf("fetcher requested future header") + } +} + // Tests that peers announcing blocks with invalid numbers (i.e. not matching // the headers provided afterwards) get dropped as malicious. func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) } diff --git a/eth/handler.go b/eth/handler.go index 88afa3afef..23a4ac3755 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -259,6 +259,9 @@ func newHandler(config *handlerConfig) (*handler, error) { heighter := func() uint64 { return h.chain.CurrentBlock().NumberU64() } + finalizeHeighter := func() uint64 { + return h.chain.CurrentFinalBlock().Number.Uint64() + } inserter := func(blocks types.Blocks) (int, error) { // All the block fetcher activities should be disabled // after the transition. Print the warning log. @@ -322,7 +325,8 @@ func newHandler(config *handlerConfig) (*handler, error) { } return n, err } - h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) + h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, + heighter, finalizeHeighter, nil, inserter, h.removePeer) fetchTx := func(peer string, hashes []common.Hash) error { p := h.peers.peer(peer) From f08b2d805b25d330f4ab5464d916343088d1a23f Mon Sep 17 00:00:00 2001 From: "larry.lx" Date: Wed, 6 Sep 2023 16:54:45 +0800 Subject: [PATCH 2/3] UT: fix UT crash in finalizeHeighter --- eth/handler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/eth/handler.go b/eth/handler.go index 23a4ac3755..316c358a25 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -260,7 +260,11 @@ func newHandler(config *handlerConfig) (*handler, error) { return h.chain.CurrentBlock().NumberU64() } finalizeHeighter := func() uint64 { - return h.chain.CurrentFinalBlock().Number.Uint64() + fblock := h.chain.CurrentFinalBlock() + if fblock == nil { + return 0 + } + return fblock.Number.Uint64() } inserter := func(blocks types.Blocks) (int, error) { // All the block fetcher activities should be disabled From 3973bf256e23d0f7caaff5fa853186f549209448 Mon Sep 17 00:00:00 2001 From: "larry.lx" Date: Wed, 6 Sep 2023 18:38:19 +0800 Subject: [PATCH 3/3] comments: update the comments --- eth/fetcher/block_fetcher_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index 23e59a9079..79f4e4f40d 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -789,15 +789,15 @@ func testFinalizedAnnouncementDiscarding(t *testing.T, light bool) { fetching := make(chan struct{}, 2) tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} } - // Ensure that a block with a lower number than the threshold is discarded + // Ensure that a block with a lower number than the finalized height is discarded tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil) select { case <-time.After(50 * time.Millisecond): case <-fetching: t.Fatalf("fetcher requested stale header") } - // Ensure that a block with a higher number than the threshold is discarded - tester.fetcher.Notify("higher", hashes[equal], blocks[hashes[equal]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil) + // Ensure that a block with a same number of the finalized height is discarded + tester.fetcher.Notify("equal", hashes[equal], blocks[hashes[equal]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil) select { case <-time.After(50 * time.Millisecond): case <-fetching: