Skip to content

Commit

Permalink
les/vflux/server: fixed issues in priorityPool
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi authored and karalabe committed Feb 9, 2023
1 parent 0ecff71 commit 4509c1f
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions les/vflux/server/prioritypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
}
pp.setTempCapacity(c, maxTarget)
c.minTarget = minTarget
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)
pp.removeFromQueues(c)
pp.activeQueue.Push(c)
pp.enforceLimits()
updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
Expand Down Expand Up @@ -291,6 +290,16 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 {
return p.nodePriority.priority(pp.minCap)
}

// removeFromQueues removes the node from the active/inactive queues
func (pp *priorityPool) removeFromQueues(c *ppNodeInfo) {
if c.activeIndex >= 0 {
pp.activeQueue.Remove(c.activeIndex)
}
if c.inactiveIndex >= 0 {
pp.inactiveQueue.Remove(c.inactiveIndex)
}
}

// connectNode is called when a new node has been added to the pool (inactiveFlag set)
// Note: this function should run inside a NodeStateMachine operation
func (pp *priorityPool) connectNode(c *ppNodeInfo) {
Expand Down Expand Up @@ -318,8 +327,7 @@ func (pp *priorityPool) disconnectNode(c *ppNodeInfo) {
return
}
c.connected = false
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)
pp.removeFromQueues(c)

var updates []capUpdate
if c.capacity != 0 {
Expand Down Expand Up @@ -409,10 +417,11 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
return nil, math.MinInt64
}
var (
c *ppNodeInfo
lastNode *ppNodeInfo
maxActivePriority int64
)
pp.activeQueue.MultiPop(func(c *ppNodeInfo, priority int64) bool {
lastNode = c
pp.setTempState(c)
maxActivePriority = priority
if c.tempCapacity == c.minTarget || pp.activeCount > pp.maxCount {
Expand All @@ -430,7 +439,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
}
return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
})
return c, invertPriority(maxActivePriority)
return lastNode, invertPriority(maxActivePriority)
}

// finalizeChanges either commits or reverts temporary changes. The necessary capacity
Expand All @@ -439,8 +448,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
for _, c := range pp.tempState {
// always remove and push back in order to update biased priority
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)
pp.removeFromQueues(c)
oldCapacity := c.capacity
if commit {
c.capacity = c.tempCapacity
Expand Down Expand Up @@ -521,8 +529,7 @@ func (pp *priorityPool) updatePriority(node *enode.Node) {
pp.lock.Unlock()
return
}
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)
pp.removeFromQueues(c)
if c.capacity != 0 {
pp.activeQueue.Push(c)
} else {
Expand Down

0 comments on commit 4509c1f

Please sign in to comment.