Skip to content

Commit

Permalink
ligth sync: download difflayer (#2)
Browse files Browse the repository at this point in the history
* ligth sync: download difflayer

Signed-off-by: kyrie-yl <lei.y@binance.com>

* download diff layer: fix according to the comments

Signed-off-by: kyrie-yl <lei.y@binance.com>

* download diff layer: update

Signed-off-by: kyrie-yl <lei.y@binance.com>

* download diff layer: fix accroding comments

Signed-off-by: kyrie-yl <lei.y@binance.com>

Co-authored-by: kyrie-yl <lei.y@binance.com>
  • Loading branch information
kyrie-yl and kyrie-yl authored Sep 2, 2021
1 parent 42e86d8 commit f9bb1c5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
54 changes: 47 additions & 7 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -220,8 +220,43 @@ type BlockChain interface {
Snapshots() *snapshot.Tree
}

type DownloadOption func(downloader *Downloader) *Downloader

type IDiffPeer interface {
RequestDiffLayers([]common.Hash) error
}

type IPeerSet interface {
GetDiffPeer(string) IDiffPeer
}

func DiffBodiesFetchOption(peers IPeerSet) DownloadOption {
return func(dl *Downloader) *Downloader {
var hook = func(headers []*types.Header, args ...interface{}) {
if len(args) < 2 {
return
}
if mode, ok := args[0].(SyncMode); ok {
if mode == FullSync {
if peerID, ok := args[1].(string); ok {
if ep := peers.GetDiffPeer(peerID); ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.RequestDiffLayers(hashes)
}
}
}
}
}
dl.bodyFetchHook = hook
return dl
}
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, options ...DownloadOption) *Downloader {
if lightchain == nil {
lightchain = chain
}
Expand Down Expand Up @@ -252,6 +287,11 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
},
trackStateReq: make(chan *stateReq),
}
for _, option := range options {
if dl != nil {
dl = option(dl)
}
}
go dl.qosTuner()
go dl.stateFetcher()
return dl
Expand Down Expand Up @@ -1359,7 +1399,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1508,7 +1548,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers)
fetchHook(request.Headers, d.getMode(), peer.id)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
tester.downloader.bodyFetchHook = func(headers []*types.Header, _ ...interface{}) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
tester.downloader.receiptFetchHook = func(headers []*types.Header, _ ...interface{}) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
Expand Down
6 changes: 5 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func newHandler(config *handlerConfig) (*handler, error) {
if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)
var downloadOptions []downloader.DownloadOption
if h.diffSync {
downloadOptions = append(downloadOptions, downloader.DiffBodiesFetchOption(h.peers))
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
8 changes: 8 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/diff"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -205,6 +206,13 @@ func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
return <-wait, nil
}

func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer {
if p := ps.peer(pid); p != nil && p.diffExt != nil {
return p.diffExt
}
return nil
}

// registerPeer injects a new `eth` peer into the working set, or returns an error
// if the peer is already known.
func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {
Expand Down

0 comments on commit f9bb1c5

Please sign in to comment.