Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: polish code #22625

Merged
merged 3 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,6 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge
// set default announceType on server side
p.announceType = announceTypeSimple
}
p.fcClient = flowcontrol.NewClientNode(server.fcManager, p.fcParams)
}
return nil
})
Expand Down
21 changes: 9 additions & 12 deletions les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -123,26 +124,27 @@ func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}

// Connected to another server, no messages expected, just wait for disconnection
if p.server {
if err := h.server.serverset.register(p); err != nil {
return err
}
// connected to another server, no messages expected, just wait for disconnection
_, err := p.rw.ReadMsg()
h.server.serverset.unregister(p)
return err
}
defer p.fcClient.Disconnect() // set by handshake if it's not another server
// Setup flow control mechanism for the peer
p.fcClient = flowcontrol.NewClientNode(h.server.fcManager, p.fcParams)
defer p.fcClient.Disconnect()

// Reject light clients if server is not synced.
//
// Put this checking here, so that "non-synced" les-server peers are still allowed
// to keep the connection.
// Reject light clients if server is not synced. Put this checking here, so
// that "non-synced" les-server peers are still allowed to keep the connection.
if !h.synced() {
p.Log().Debug("Light server not synced, rejecting peer")
return p2p.DiscRequested
}

// Register the peer into the peerset and clientpool
if err := h.server.peers.register(p); err != nil {
return err
}
Expand All @@ -151,19 +153,14 @@ func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Client pool already closed")
return p2p.DiscRequested
}
activeCount, _ := h.server.clientPool.Active()
clientConnectionGauge.Update(int64(activeCount))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove these updates? Do you think this metric is useless? Btw the metric itself is still there is metrics.go so this looks wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because they are already counted in the clientpool internal. https://github.com/ethereum/go-ethereum/blob/master/les/vflux/server/clientpool.go#L154

p.connectedAt = mclock.Now()

var wg sync.WaitGroup // Wait group used to track all in-flight task routines.

defer func() {
wg.Wait() // Ensure all background task routines have exited.
h.server.clientPool.Unregister(p)
h.server.peers.unregister(p.ID())
p.balance = nil
activeCount, _ := h.server.clientPool.Active()
clientConnectionGauge.Update(int64(activeCount))
connectionTimer.Update(time.Duration(mclock.Now() - p.connectedAt))
}()

Expand Down
106 changes: 55 additions & 51 deletions les/vflux/server/prioritypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,22 @@ type priorityPool struct {
ns *nodestate.NodeStateMachine
clock mclock.Clock
lock sync.Mutex
inactiveQueue *prque.Prque
maxCount, maxCap uint64
minCap uint64
activeBias time.Duration
capacityStepDiv, fineStepDiv uint64

// The snapshot of priority pool for query.
cachedCurve *capacityCurve
ccUpdatedAt mclock.AbsTime
ccUpdateForced bool

tempState []*ppNodeInfo // nodes currently in temporary state
// the following fields represent the temporary state if tempState is not empty
// Runtime status of prioritypool, represents the
// temporary state if tempState is not empty
tempState []*ppNodeInfo
activeCount, activeCap uint64
activeQueue *prque.LazyQueue
inactiveQueue *prque.Prque
}

// ppNodeInfo is the internal node descriptor of priorityPool
Expand All @@ -89,8 +91,9 @@ type ppNodeInfo struct {

tempState bool // should only be true while the priorityPool lock is held
tempCapacity uint64 // equals capacity when tempState is false

// the following fields only affect the temporary state and they are set to their
// default value when entering the temp state
// default value when leaving the temp state
minTarget, stepDiv uint64
bias time.Duration
}
Expand Down Expand Up @@ -157,11 +160,6 @@ func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock m
func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget uint64, bias time.Duration) uint64 {
pp.lock.Lock()
pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()

if minTarget < pp.minCap {
minTarget = pp.minCap
Expand All @@ -175,43 +173,44 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
if c == nil {
log.Error("requestCapacity called for unknown node", "id", node.ID())
pp.lock.Unlock()
return 0
}
pp.setTempState(c)
if maxTarget > c.capacity {
c.bias = bias
c.stepDiv = pp.fineStepDiv
stepDiv := pp.fineStepDiv
if maxTarget <= c.capacity {
bias, stepDiv = 0, pp.capacityStepDiv
}
pp.setTempState(c, pp.minCap, stepDiv, bias)
pp.setTempCapacity(c, maxTarget)
c.minTarget = minTarget
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)
pp.activeQueue.Push(c)
pp.enforceLimits()
updates = pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
pp.lock.Unlock()
pp.updateFlags(updates)
return c.capacity
}

// SetLimits sets the maximum number and total capacity of simultaneously active nodes
func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) {
pp.lock.Lock()
pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.ns.Operation(func() { pp.updateFlags(updates) })
}()

inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap)
dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap)
pp.maxCount, pp.maxCap = maxCount, maxCap

var updates []capUpdate
if dec {
pp.enforceLimits()
updates = pp.finalizeChanges(true)
}
if inc {
updates = append(updates, pp.tryActivate(false)...)
}
pp.lock.Unlock()
pp.ns.Operation(func() { pp.updateFlags(updates) })
}

// setActiveBias sets the bias applied when trying to activate inactive nodes
Expand Down Expand Up @@ -291,18 +290,15 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 {
func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
pp.lock.Lock()
pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()

if c.connected {
pp.lock.Unlock()
return
}
c.connected = true
pp.inactiveQueue.Push(c, pp.inactivePriority(c))
updates = pp.tryActivate(false)
updates := pp.tryActivate(false)
pp.lock.Unlock()
pp.updateFlags(updates)
}

// disconnectedNode is called when a node has been removed from the pool (both inactiveFlag
Expand All @@ -311,40 +307,55 @@ func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
func (pp *priorityPool) disconnectedNode(c *ppNodeInfo) {
pp.lock.Lock()
pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()

if !c.connected {
pp.lock.Unlock()
return
}
c.connected = false
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)

var updates []capUpdate
if c.capacity != 0 {
pp.setTempState(c)
pp.setTempState(c, pp.minCap, pp.capacityStepDiv, 0)
pp.setTempCapacity(c, 0)
updates = pp.tryActivate(true)
}
pp.lock.Unlock()
pp.updateFlags(updates)
}

// setTempState internally puts a node in a temporary state that can either be reverted
// or confirmed later. This temporary state allows changing the capacity of a node and
// moving it between the active and inactive queue. activeFlag/inactiveFlag and
// capacityField are not changed while the changes are still temporary.
func (pp *priorityPool) setTempState(c *ppNodeInfo) {
func (pp *priorityPool) setTempState(c *ppNodeInfo, minTarget uint64, stepDiv uint64, bias time.Duration) {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
if c.tempState {
return
}
c.tempState = true
if c.tempCapacity != c.capacity { // should never happen
log.Error("tempCapacity != capacity when entering tempState")
}
c.minTarget = minTarget
c.stepDiv = stepDiv
c.bias = bias
pp.tempState = append(pp.tempState, c)
}

// unsetTempState revokes the temp status of the node and reset all internal
// fields to the default value.
func (pp *priorityPool) unsetTempState(c *ppNodeInfo) {
if !c.tempState {
return
}
c.tempState = false
if c.tempCapacity != c.capacity { // should never happen
log.Error("tempCapacity != capacity when leaving tempState")
}
c.minTarget = pp.minCap
c.stepDiv = pp.capacityStepDiv
pp.tempState = append(pp.tempState, c)
c.bias = 0
}

// setTempCapacity changes the capacity of a node in the temporary state and adjusts
Expand Down Expand Up @@ -378,7 +389,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
)
pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool {
c = data.(*ppNodeInfo)
pp.setTempState(c)
pp.setTempState(c, pp.minCap, pp.capacityStepDiv, 0)
maxActivePriority = priority
if c.tempCapacity == c.minTarget || pp.activeCount > pp.maxCount {
pp.setTempCapacity(c, 0)
Expand Down Expand Up @@ -412,10 +423,8 @@ func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
} else {
pp.setTempCapacity(c, c.capacity) // revert activeCount/activeCap
}
c.tempState = false
c.bias = 0
c.stepDiv = pp.capacityStepDiv
c.minTarget = pp.minCap
pp.unsetTempState(c)

if c.connected {
if c.capacity != 0 {
pp.activeQueue.Push(c)
Expand Down Expand Up @@ -461,14 +470,12 @@ func (pp *priorityPool) updateFlags(updates []capUpdate) {
func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
for pp.inactiveQueue.Size() > 0 {
c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
pp.setTempState(c)
pp.setTempState(c, pp.minCap, pp.capacityStepDiv, pp.activeBias)
pp.setTempCapacity(c, pp.minCap)
c.bias = pp.activeBias
pp.activeQueue.Push(c)
pp.enforceLimits()
if c.tempCapacity > 0 {
commit = true
c.bias = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an identical change here and I am not sure about the potential effects. Until now, the bias was always only applied to one node that has been inserted or whose capacity was increased. It is meant to protect against quick oscillations so we always use it against the node that is trying to push out others from the pool. Once we decide to grant it the space it needs, it's no longer biased. Now tryActivate can activate multiple clients and in your version the previously activated ones stay biased until we finish the entire operation. This means that further inactive nodes can very easily deactivate recently activated ones if their priorities are very close to each other because they are both biased and the already activated one does not have an advantage against the latest activation attempt. I'd prefer to keep the old and thoroughly tested behavior here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. It's my mistake, will fix it.

} else {
break
}
Expand All @@ -483,14 +490,9 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
func (pp *priorityPool) updatePriority(node *enode.Node) {
pp.lock.Lock()
pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()

c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
if c == nil || !c.connected {
pp.lock.Unlock()
return
}
pp.activeQueue.Remove(c.activeIndex)
Expand All @@ -500,7 +502,9 @@ func (pp *priorityPool) updatePriority(node *enode.Node) {
} else {
pp.inactiveQueue.Push(c, pp.inactivePriority(c))
}
updates = pp.tryActivate(false)
updates := pp.tryActivate(false)
pp.lock.Unlock()
pp.updateFlags(updates)
}

// capacityCurve is a snapshot of the priority pool contents in a format that can efficiently
Expand Down