Skip to content
This repository has been archived by the owner on Jan 12, 2024. It is now read-only.

Commit

Permalink
les: fix UDP connection query (ethereum#22451)
Browse files Browse the repository at this point in the history
This PR fixes multiple issues with the UDP connection pre-negotiation feature:

- the enable condition was wrong (it checked the existence of the DiscV5 struct where it wasn't initialized yet, disabling the feature even if discv5 was enabled)
- the server pool queried already connected nodes when the discovery iterators returned them again
- servers responded positively before they were synced and really willing to accept connections

Metrics are also added on the server side that count the positive and negative replies to served connection queries.
  • Loading branch information
zsfelfoldi authored Mar 16, 2021
1 parent 94ab4ea commit 62d8022
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 76 deletions.
18 changes: 12 additions & 6 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ type LightEthereum struct {
accountManager *accounts.Manager
netRPCService *ethapi.PublicNetAPI

p2pServer *p2p.Server
p2pConfig *p2p.Config
p2pServer *p2p.Server
p2pConfig *p2p.Config
udpEnabled bool
}

// New creates an instance of the light client.
Expand Down Expand Up @@ -113,10 +114,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
udpEnabled: stack.Config().P2P.DiscoveryV5,
}

var prenegQuery vfc.QueryFunc
if leth.p2pServer.DiscV5 != nil {
if leth.udpEnabled {
prenegQuery = leth.prenegQuery
}
leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
Expand Down Expand Up @@ -198,7 +200,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {

// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
if s.p2pServer.DiscV5 == nil {
if !s.udpEnabled {
return nil
}
reqsEnc, _ := rlp.EncodeToBytes(&reqs)
Expand All @@ -215,7 +217,7 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R
func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
if n.Seq() == 0 {
var err error
if s.p2pServer.DiscV5 == nil {
if !s.udpEnabled {
return 0
}
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
Expand Down Expand Up @@ -346,7 +348,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
func (s *LightEthereum) Start() error {
log.Warn("Light client mode is an experimental feature")

discovery, err := s.setupDiscovery(s.p2pConfig)
if s.udpEnabled && s.p2pServer.DiscV5 == nil {
s.udpEnabled = false
log.Error("Discovery v5 is not initialized")
}
discovery, err := s.setupDiscovery()
if err != nil {
return err
}
Expand Down
18 changes: 16 additions & 2 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type clientPool struct {
clock mclock.Clock
closed bool
removePeer func(enode.ID)
synced func() bool
ns *nodestate.NodeStateMachine
pp *vfs.PriorityPool
bt *vfs.BalanceTracker
Expand Down Expand Up @@ -107,7 +108,7 @@ type clientInfo struct {
}

// newClientPool creates a new client pool
func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool {
pool := &clientPool{
ns: ns,
BalanceTrackerSetup: balanceTrackerSetup,
Expand All @@ -116,6 +117,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap
minCap: minCap,
connectedBias: connectedBias,
removePeer: removePeer,
synced: synced,
}
pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
Expand Down Expand Up @@ -396,6 +398,13 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
return nil
}
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
if !f.synced() {
capacityQueryZeroMeter.Mark(1)
reply, _ := rlp.EncodeToBytes(&result)
return reply
}

node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
Expand All @@ -416,7 +425,6 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
}
// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
curve := f.pp.GetCapacityCurve().Exclude(id)
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
bias := time.Second * time.Duration(req.Bias)
if f.connectedBias > bias {
bias = f.connectedBias
Expand All @@ -434,6 +442,12 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
result[i] = 0
}
}
// add first result to metrics (don't care about priority client multi-queries yet)
if result[0] == 0 {
capacityQueryZeroMeter.Mark(1)
} else {
capacityQueryNonZeroMeter.Mark(1)
}
reply, _ := rlp.EncodeToBytes(&result)
return reply
}
24 changes: 12 additions & 12 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando
disconnFn = func(id enode.ID) {
disconnCh <- int(id[0]) + int(id[1])<<8
}
pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn)
pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn, alwaysTrueFn)
)
pool.ns.Start()

Expand Down Expand Up @@ -239,7 +239,7 @@ func TestConnectPaidClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10))
Expand All @@ -255,7 +255,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand All @@ -274,7 +274,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestPaidClientKickedOut(t *testing.T) {
removeFn := func(id enode.ID) {
kickedCh <- int(id[0])
}
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
pool.bt.SetExpirationTCs(0, 0)
defer pool.stop()
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestConnectFreeClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10))
Expand All @@ -352,7 +352,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestFreeClientKickedOut(t *testing.T) {
kicked = make(chan int, 100)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) }
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestPositiveBalanceCalculation(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand All @@ -448,7 +448,7 @@ func TestDowngradePriorityClient(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestNegativeBalanceCalculation(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -521,7 +521,7 @@ func TestInactiveClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(2, uint64(2))
Expand Down
5 changes: 2 additions & 3 deletions les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package les

import (
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -42,7 +41,7 @@ type ethEntry struct {
func (ethEntry) ENRKey() string { return "eth" }

// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) {
it := enode.NewFairMix(0)

// Enable DNS discovery.
Expand All @@ -56,7 +55,7 @@ func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error
}

// Enable DHT.
if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil {
if eth.udpEnabled {
it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
}

Expand Down
10 changes: 6 additions & 4 deletions les/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ var (
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)

totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
capacityQueryZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryZero", nil)
capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryNonZero", nil)

requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
Expand Down
2 changes: 1 addition & 1 deletion les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
srv.maxCapacity = totalRecharge
}
srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync)
srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})

checkpoint := srv.latestLocalCheckpoint()
Expand Down
6 changes: 5 additions & 1 deletion les/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
}
server.costTracker, server.minCapacity = newCostTracker(db, server.config)
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {})
server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}, alwaysTrueFn)
server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil {
Expand All @@ -319,6 +319,10 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
return server.handler, simulation
}

func alwaysTrueFn() bool {
return true
}

// testPeer is a simulated peer to allow testing direct network calls.
type testPeer struct {
cpeer *clientPeer
Expand Down
Loading

0 comments on commit 62d8022

Please sign in to comment.