Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetcher: no import blocks before or equal to the finalized height #1854

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 57 additions & 35 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type blockBroadcasterFn func(block *types.Block, propagate bool)
// chainHeightFn is a callback type to retrieve the current chain height.
type chainHeightFn func() uint64

// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height.
type chainFinalizedHeightFn func() uint64

// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func(headers []*types.Header) (int, error)

Expand Down Expand Up @@ -189,14 +192,15 @@ type BlockFetcher struct {
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving

// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
Expand All @@ -207,32 +211,35 @@ type BlockFetcher struct {
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn,
broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn,
insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
requeue: make(chan *blockOrHeaderInject),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
requeue: make(chan *blockOrHeaderInject),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
chainFinalizedHeight: chainFinalizedHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
}

Expand Down Expand Up @@ -361,6 +368,7 @@ func (f *BlockFetcher) loop() {
}
// Import any queued blocks that could potentially fit
height := f.chainHeight()
finalizedHeight := f.chainFinalizedHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*blockOrHeaderInject)
hash := op.hash()
Expand All @@ -377,7 +385,7 @@ func (f *BlockFetcher) loop() {
break
}
// Otherwise if fresh and still unknown, try and import
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
if (number+maxUncleDist < height) || number <= finalizedHeight || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
f.forgetBlock(hash)
continue
}
Expand Down Expand Up @@ -408,7 +416,13 @@ func (f *BlockFetcher) loop() {
}
// If we have a valid block number, check that it's potentially useful
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
log.Debug("Peer discarded announcement by distance", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
blockAnnounceDropMeter.Mark(1)
break
}
finalized := f.chainFinalizedHeight()
if notification.number <= finalized {
log.Debug("Peer discarded announcement by finality", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "finalized", finalized)
blockAnnounceDropMeter.Mark(1)
break
}
Expand Down Expand Up @@ -814,6 +828,14 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
f.forgetHash(hash)
return
}
// Discard any block that is below the current finalized height
finalizedHeight := f.chainFinalizedHeight()
if number <= finalizedHeight {
log.Debug("Discarded delivered header or block, below or equal to finalized", "peer", peer, "number", number, "hash", hash, "finalized", finalizedHeight)
blockBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
if _, ok := f.queued[hash]; !ok {
op := &blockOrHeaderInject{origin: peer}
Expand Down
77 changes: 76 additions & 1 deletion eth/fetcher/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func newTester(light bool) *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader,
tester.broadcastBlock, tester.chainHeight, tester.chainFinalizedHeight, tester.insertHeaders,
tester.insertChain, tester.dropPeer)
tester.fetcher.Start()

return tester
Expand Down Expand Up @@ -138,6 +140,18 @@ func (f *fetcherTester) chainHeight() uint64 {
return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
}

func (f *fetcherTester) chainFinalizedHeight() uint64 {
f.lock.RLock()
defer f.lock.RUnlock()
if len(f.hashes) < 3 {
return 0
}
if f.fetcher.light {
return f.headers[f.hashes[len(f.hashes)-3]].Number.Uint64()
}
return f.blocks[f.hashes[len(f.hashes)-3]].NumberU64()
}

// insertChain injects a new headers into the simulated chain.
func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
f.lock.Lock()
Expand Down Expand Up @@ -730,6 +744,67 @@ func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
}
}

// Tests that announcements with numbers much lower or equal to the current finalized block
// head get discarded to prevent wasting resources on useless blocks from faulty peers.
func TestFullFinalizedAnnouncementDiscarding(t *testing.T) {
testFinalizedAnnouncementDiscarding(t, false)
}
func TestLightFinalizedAnnouncementDiscarding(t *testing.T) {
testFinalizedAnnouncementDiscarding(t, true)
}

func testFinalizedAnnouncementDiscarding(t *testing.T, light bool) {
// Create a long chain to import and define the discard boundaries
hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)

head := hashes[len(hashes)/2]
justified := hashes[len(hashes)/2+1]
finalized := hashes[len(hashes)/2+2]
beforeFinalized := hashes[len(hashes)/2+3]

low, equal := len(hashes)/2+3, len(hashes)/2+2

// Create a tester and simulate a head block being the middle of the above chain
tester := newTester(light)

tester.lock.Lock()
tester.hashes = []common.Hash{beforeFinalized, finalized, justified, head}
tester.headers = map[common.Hash]*types.Header{
beforeFinalized: blocks[beforeFinalized].Header(),
finalized: blocks[finalized].Header(),
justified: blocks[justified].Header(),
head: blocks[head].Header(),
}
tester.blocks = map[common.Hash]*types.Block{
beforeFinalized: blocks[beforeFinalized],
finalized: blocks[finalized],
justified: blocks[justified],
head: blocks[head],
}
tester.lock.Unlock()

headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)

fetching := make(chan struct{}, 2)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }

// Ensure that a block with a lower number than the finalized height is discarded
tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil)
select {
case <-time.After(50 * time.Millisecond):
case <-fetching:
t.Fatalf("fetcher requested stale header")
}
// Ensure that a block with a same number of the finalized height is discarded
tester.fetcher.Notify("equal", hashes[equal], blocks[hashes[equal]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher, nil)
select {
case <-time.After(50 * time.Millisecond):
case <-fetching:
t.Fatalf("fetcher requested future header")
}
}

// Tests that peers announcing blocks with invalid numbers (i.e. not matching
// the headers provided afterwards) get dropped as malicious.
func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
Expand Down
10 changes: 9 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ func newHandler(config *handlerConfig) (*handler, error) {
heighter := func() uint64 {
return h.chain.CurrentBlock().NumberU64()
}
finalizeHeighter := func() uint64 {
fblock := h.chain.CurrentFinalBlock()
if fblock == nil {
return 0
}
return fblock.Number.Uint64()
}
inserter := func(blocks types.Blocks) (int, error) {
// All the block fetcher activities should be disabled
// after the transition. Print the warning log.
Expand Down Expand Up @@ -322,7 +329,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return n, err
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock,
heighter, finalizeHeighter, nil, inserter, h.removePeer)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down