Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ligth sync: download difflayer #2

Merged
merged 4 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -220,8 +220,43 @@ type BlockChain interface {
Snapshots() *snapshot.Tree
}

type DownloadOption func(downloader *Downloader) *Downloader

type IDiffPeer interface {
RequestDiffLayers([]common.Hash) error
}

type IPeerSet interface {
GetDiffPeer(string) IDiffPeer
}

func DiffBodiesFetchOption(peers IPeerSet) DownloadOption {
return func(dl *Downloader) *Downloader {
var hook = func(headers []*types.Header, args ...interface{}) {
if len(args) < 2 {
return
}
if mode, ok := args[0].(SyncMode); ok {
if mode == FullSync {
if peerID, ok := args[1].(string); ok {
if ep := peers.GetDiffPeer(peerID); ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.RequestDiffLayers(hashes)
}
}
}
}
}
dl.bodyFetchHook = hook
return dl
}
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, options ...DownloadOption) *Downloader {
if lightchain == nil {
lightchain = chain
}
Expand Down Expand Up @@ -252,6 +287,11 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
},
trackStateReq: make(chan *stateReq),
}
for _, option := range options {
if dl != nil {
dl = option(dl)
}
}
go dl.qosTuner()
go dl.stateFetcher()
return dl
Expand Down Expand Up @@ -1359,7 +1399,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1508,7 +1548,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers)
fetchHook(request.Headers, d.getMode(), peer.id)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
tester.downloader.bodyFetchHook = func(headers []*types.Header, _ ...interface{}) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
tester.downloader.receiptFetchHook = func(headers []*types.Header, _ ...interface{}) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
Expand Down
6 changes: 5 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func newHandler(config *handlerConfig) (*handler, error) {
if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)
var downloadOptions []downloader.DownloadOption
if h.diffSync {
downloadOptions = append(downloadOptions, downloader.DiffBodiesFetchOption(h.peers))
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
8 changes: 8 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/diff"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -205,6 +206,13 @@ func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
return <-wait, nil
}

func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer {
if p := ps.peer(pid); p != nil && p.diffExt != nil {
return p.diffExt
}
return nil
}

// registerPeer injects a new `eth` peer into the working set, or returns an error
// if the peer is already known.
func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {
Expand Down