diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6f59b29a5e..5ed70898fb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -161,10 +161,10 @@ type Downloader struct { quitLock sync.Mutex // Lock to prevent double closes // Testing hooks - syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run - bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch - receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch - chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run + bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch + receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch + chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } // LightChain encapsulates functions required to synchronise a light chain. @@ -220,8 +220,43 @@ type BlockChain interface { Snapshots() *snapshot.Tree } +type DownloadOption func(downloader *Downloader) *Downloader + +type IDiffPeer interface { + RequestDiffLayers([]common.Hash) error +} + +type IPeerSet interface { + GetDiffPeer(string) IDiffPeer +} + +func DiffBodiesFetchOption(peers IPeerSet) DownloadOption { + return func(dl *Downloader) *Downloader { + var hook = func(headers []*types.Header, args ...interface{}) { + if len(args) < 2 { + return + } + if mode, ok := args[0].(SyncMode); ok { + if mode == FullSync { + if peerID, ok := args[1].(string); ok { + if ep := peers.GetDiffPeer(peerID); ep != nil { + hashes := make([]common.Hash, 0, len(headers)) + for _, header := range headers { + hashes = append(hashes, header.Hash()) + } + ep.RequestDiffLayers(hashes) + } + } + } + } + } + dl.bodyFetchHook = hook + return dl + } +} + // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, options ...DownloadOption) *Downloader { if lightchain == nil { lightchain = chain } @@ -252,6 +287,11 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, }, trackStateReq: make(chan *stateReq), } + for _, option := range options { + if dl != nil { + dl = option(dl) + } + } go dl.qosTuner() go dl.stateFetcher() return dl @@ -1359,7 +1399,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // - kind: textual label of the type being downloaded to display in log messages func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool), - fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, + fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error { // Create a ticker to detect expired retrieval tasks @@ -1508,7 +1548,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { - fetchHook(request.Headers) + fetchHook(request.Headers, d.getMode(), peer.id) } if err := fetch(peer, request); err != nil { // Although we could try and make an attempt to fix this, this error really diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 794160993b..66f6872025 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { // Instrument the downloader to signal body requests bodiesHave, receiptsHave := int32(0), int32(0) - tester.downloader.bodyFetchHook = func(headers []*types.Header) { + tester.downloader.bodyFetchHook = func(headers []*types.Header, _ ...interface{}) { atomic.AddInt32(&bodiesHave, int32(len(headers))) } - tester.downloader.receiptFetchHook = func(headers []*types.Header) { + tester.downloader.receiptFetchHook = func(headers []*types.Header, _ ...interface{}) { atomic.AddInt32(&receiptsHave, int32(len(headers))) } // Synchronise with the peer and make sure all blocks were retrieved diff --git a/eth/handler.go b/eth/handler.go index 90d2e96159..03867c3920 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -192,7 +192,11 @@ func newHandler(config *handlerConfig) (*handler, error) { if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 { h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database) } - h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer) + var downloadOptions []downloader.DownloadOption + if h.diffSync { + downloadOptions = append(downloadOptions, downloader.DiffBodiesFetchOption(h.peers)) + } + h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/eth/peerset.go b/eth/peerset.go index b5ac95c1d6..f0955f34c6 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -205,6 +206,13 @@ func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) { return <-wait, nil } +func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { + if p := ps.peer(pid); p != nil && p.diffExt != nil { + return p.diffExt + } + return nil +} + // registerPeer injects a new `eth` peer into the working set, or returns an error // if the peer is already known. func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {