Skip to content

Commit

Permalink
download diff layer: update
Browse files Browse the repository at this point in the history
Signed-off-by: kyrie-yl <lei.y@binance.com>
  • Loading branch information
kyrie-yl committed Sep 2, 2021
1 parent 1df6de6 commit c91de18
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 46 deletions.
35 changes: 19 additions & 16 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type Downloader struct {

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func(SyncMode, string, []*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func(SyncMode, string, []*types.Header) // Method to call upon starting a receipt fetch
bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

Expand Down Expand Up @@ -232,18 +232,25 @@ type IPeerSet interface {

func DiffBodiesFetchOption(peers IPeerSet) DownloadOption {
return func(dl *Downloader) *Downloader {
var hook = func(mode SyncMode, peerID string, headers []*types.Header) {
if mode == FullSync {
if ep := peers.GetDiffPeer(peerID); ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
var hook = func(headers []*types.Header, options ...interface{}) {
if len(options) < 2 {
return
}
if mode, ok := options[0].(SyncMode); ok {
if mode == FullSync {
if peerID, ok := options[1].(string); ok {
if ep := peers.GetDiffPeer(peerID); ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.RequestDiffLayers(hashes)
}
}
ep.RequestDiffLayers(hashes)
}
}
}
dl.SetBodyFetchHook(hook)
dl.bodyFetchHook = hook
return dl
}
}
Expand Down Expand Up @@ -323,10 +330,6 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
}
}

func (d *Downloader) SetBodyFetchHook(hook func(SyncMode, string, []*types.Header)) {
d.bodyFetchHook = hook
}

// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
Expand Down Expand Up @@ -1396,7 +1399,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func(SyncMode, string, []*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1545,7 +1548,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(d.getMode(), peer.id, request.Headers)
fetchHook(request.Headers, d.getMode(), peer.id)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
tester.downloader.bodyFetchHook = func(headers []*types.Header, options ...interface{}) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
tester.downloader.receiptFetchHook = func(headers []*types.Header, options ...interface{}) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
Expand Down
8 changes: 4 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type handler struct {
stateBloom *trie.SyncBloom
blockFetcher *fetcher.BlockFetcher
txFetcher *fetcher.TxFetcher
peers *PeerSet
peers *peerSet

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
Expand Down Expand Up @@ -234,7 +234,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.Peer(peer)
p := h.peers.peer(peer)
if p == nil {
return errors.New("unknown peer")
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
}
defer h.removePeer(peer.ID())

p := h.peers.Peer(peer.ID())
p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("peer dropped during handling")
}
Expand Down Expand Up @@ -398,7 +398,7 @@ func (h *handler) removePeer(id string) {
logger = log.New("peer", id[:8])
}
// Abort if the peer does not exist
peer := h.peers.Peer(id)
peer := h.peers.peer(id)
if peer == nil {
logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered)
return
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (h *diffHandler) RunPeer(peer *diff.Peer, hand diff.Handler) error {

// PeerInfo retrieves all known `diff` information about a peer.
func (h *diffHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.Peer(id.String()); p != nil {
if p := h.peers.peer(id.String()); p != nil {
if p.DiffExt != nil {
return p.DiffExt.info()
}
Expand Down
6 changes: 3 additions & 3 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {

// PeerInfo retrieves all known `eth` information about a peer.
func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.Peer(id.String()); p != nil {
if p := h.peers.peer(id.String()); p != nil {
return p.info()
}
return nil
Expand Down Expand Up @@ -107,7 +107,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// handleHeaders is invoked from a peer's message handler when it transmits a batch
// of headers for the local node to process.
func (h *ethHandler) handleHeaders(peer *eth.Peer, headers []*types.Header) error {
p := h.peers.Peer(peer.ID())
p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("unregistered during callback")
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,
var diffFetcher fetcher.DiffRequesterFn
if h.lightSync {
// the peer support diff protocol
if ep := h.peers.Peer(peer.ID()); ep != nil && ep.DiffExt != nil {
if ep := h.peers.peer(peer.ID()); ep != nil && ep.DiffExt != nil {
diffFetcher = ep.DiffExt.RequestDiffLayers
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error {

// PeerInfo retrieves all known `snap` information about a peer.
func (h *snapHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.Peer(id.String()); p != nil {
if p := h.peers.peer(id.String()); p != nil {
if p.snapExt != nil {
return p.snapExt.info()
}
Expand Down
38 changes: 19 additions & 19 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ var (
errDiffWithoutEth = errors.New("peer connected on diff without compatible eth support")
)

// PeerSet represents the collection of active peers currently participating in
// peerSet represents the collection of active peers currently participating in
// the `eth` protocol, with or without the `snap` extension.
type PeerSet struct {
type peerSet struct {
peers map[string]*ethPeer // Peers connected on the `eth` protocol
snapPeers int // Number of `snap` compatible peers for connection prioritization

Expand All @@ -68,8 +68,8 @@ type PeerSet struct {
}

// newPeerSet creates a new peer set to track the active participants.
func newPeerSet() *PeerSet {
return &PeerSet{
func newPeerSet() *peerSet {
return &peerSet{
peers: make(map[string]*ethPeer),
snapWait: make(map[string]chan *snap.Peer),
snapPend: make(map[string]*snap.Peer),
Expand All @@ -81,7 +81,7 @@ func newPeerSet() *PeerSet {
// registerSnapExtension unblocks an already connected `eth` peer waiting for its
// `snap` extension, or if no such peer exists, tracks the extension for the time
// being until the `eth` main protocol starts looking for it.
func (ps *PeerSet) registerSnapExtension(peer *snap.Peer) error {
func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error {
// Reject the peer if it advertises `snap` without `eth` as `snap` is only a
// satellite protocol meaningful with the chain selection of `eth`
if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (ps *PeerSet) registerSnapExtension(peer *snap.Peer) error {
// registerDiffExtension unblocks an already connected `eth` peer waiting for its
// `diff` extension, or if no such peer exists, tracks the extension for the time
// being until the `eth` main protocol starts looking for it.
func (ps *PeerSet) registerDiffExtension(peer *diff.Peer) error {
func (ps *peerSet) registerDiffExtension(peer *diff.Peer) error {
// Reject the peer if it advertises `diff` without `eth` as `diff` is only a
// satellite protocol meaningful with the chain selection of `eth`
if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
Expand Down Expand Up @@ -140,7 +140,7 @@ func (ps *PeerSet) registerDiffExtension(peer *diff.Peer) error {

// waitExtensions blocks until all satellite protocols are connected and tracked
// by the peerset.
func (ps *PeerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {
func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {
// If the peer does not support a compatible `snap`, don't wait
if !peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) {
return nil, nil
Expand Down Expand Up @@ -174,7 +174,7 @@ func (ps *PeerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {

// waitDiffExtension blocks until all satellite protocols are connected and tracked
// by the peerset.
func (ps *PeerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
// If the peer does not support a compatible `diff`, don't wait
if !peer.RunningCap(diff.ProtocolName, diff.ProtocolVersions) {
return nil, nil
Expand Down Expand Up @@ -206,16 +206,16 @@ func (ps *PeerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
return <-wait, nil
}

func (ps *PeerSet) GetDiffPeer(pid string) downloader.IDiffPeer {
if p := ps.Peer(pid); p != nil && p.DiffExt != nil {
func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer {
if p := ps.peer(pid); p != nil && p.DiffExt != nil {
return p.DiffExt
}
return nil
}

// registerPeer injects a new `eth` peer into the working set, or returns an error
// if the peer is already known.
func (ps *PeerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {
func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {
// Start tracking the new peer
ps.lock.Lock()
defer ps.lock.Unlock()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (ps *PeerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Pe

// unregisterPeer removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *PeerSet) unregisterPeer(id string) error {
func (ps *peerSet) unregisterPeer(id string) error {
ps.lock.Lock()
defer ps.lock.Unlock()

Expand All @@ -259,7 +259,7 @@ func (ps *PeerSet) unregisterPeer(id string) error {
}

// peer retrieves the registered peer with the given id.
func (ps *PeerSet) Peer(id string) *ethPeer {
func (ps *peerSet) peer(id string) *ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

Expand All @@ -268,7 +268,7 @@ func (ps *PeerSet) Peer(id string) *ethPeer {

// peersWithoutBlock retrieves a list of peers that do not have a given block in
// their set of known hashes so it might be propagated to them.
func (ps *PeerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

Expand All @@ -283,7 +283,7 @@ func (ps *PeerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {

// peersWithoutTransaction retrieves a list of peers that do not have a given
// transaction in their set of known hashes.
func (ps *PeerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

Expand All @@ -299,15 +299,15 @@ func (ps *PeerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
// len returns if the current number of `eth` peers in the set. Since the `snap`
// peers are tied to the existence of an `eth` connection, that will always be a
// subset of `eth`.
func (ps *PeerSet) len() int {
func (ps *peerSet) len() int {
ps.lock.RLock()
defer ps.lock.RUnlock()

return len(ps.peers)
}

// snapLen returns if the current number of `snap` peers in the set.
func (ps *PeerSet) snapLen() int {
func (ps *peerSet) snapLen() int {
ps.lock.RLock()
defer ps.lock.RUnlock()

Expand All @@ -316,7 +316,7 @@ func (ps *PeerSet) snapLen() int {

// peerWithHighestTD retrieves the known peer with the currently highest total
// difficulty.
func (ps *PeerSet) peerWithHighestTD() *eth.Peer {
func (ps *peerSet) peerWithHighestTD() *eth.Peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

Expand All @@ -333,7 +333,7 @@ func (ps *PeerSet) peerWithHighestTD() *eth.Peer {
}

// close disconnects all peers.
func (ps *PeerSet) close() {
func (ps *peerSet) close() {
ps.lock.Lock()
defer ps.lock.Unlock()

Expand Down

0 comments on commit c91de18

Please sign in to comment.