Skip to content

Commit

Permalink
les: clientpool updates and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Nov 8, 2019
1 parent 57bb045 commit 9117084
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
8 changes: 8 additions & 0 deletions les/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func (bt *balanceTracker) timeUntil(priority int64) (time.Duration, bool) {
return time.Duration(dt), true
}

// setCapacity updates the capacity value used for priority calculation
func (bt *balanceTracker) setCapacity(capacity uint64) {
bt.lock.Lock()
defer bt.lock.Unlock()

bt.capacity = capacity
}

// getPriority returns the actual priority based on the current balance
func (bt *balanceTracker) getPriority(now mclock.AbsTime) int64 {
bt.lock.Lock()
Expand Down
28 changes: 11 additions & 17 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
if !e.priority {
capacity = f.freeClientCap
}
// Ensure the capacity will never lower than the free capacity.
if capacity < f.freeClientCap {
capacity = f.freeClientCap
}
e.capacity = capacity
if e.priority {
f.priorityConnected += capacity
}

// Starts a balance tracker
e.balanceTracker.init(f.clock, capacity)
e.balanceTracker.setBalance(posBalance, negBalance)
e.updatePriceFactors()

// If the number of clients already connected in the clientpool exceeds its
// capacity, evict some clients with lowest priority.
Expand Down Expand Up @@ -310,9 +298,15 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
f.connectedQueue.Push(e)
f.connectedCap += e.capacity

// Starts a balance tracker
e.balanceTracker.init(f.clock, capacity)
e.balanceTracker.setBalance(posBalance, negBalance)
e.updatePriceFactors()

// If the current client is a paid client, monitor the status of client,
// downgrade it to normal client if positive balance is used up.
if e.priority {
f.priorityConnected += capacity
e.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) })
}
// If the capacity of client is not the default value(free capacity), notify
Expand Down Expand Up @@ -495,8 +489,8 @@ func (f *clientPool) setCapacity(c *clientInfo, capacity uint64) error {
oldCapacity := c.capacity
c.capacity = capacity
f.connectedCap += capacity - oldCapacity
f.connectedQueue.Remove(c.queueIndex)
f.connectedQueue.Push(c)
c.balanceTracker.setCapacity(capacity)
f.connectedQueue.Update(c.queueIndex)
if f.connectedCap > f.capLimit {
var kickList []*clientInfo
kick := true
Expand All @@ -516,6 +510,7 @@ func (f *clientPool) setCapacity(c *clientInfo, capacity uint64) error {
}
} else {
c.capacity = oldCapacity
c.balanceTracker.setCapacity(oldCapacity)
for _, c := range kickList {
f.connectedCap += c.capacity
f.connectedQueue.Push(c)
Expand All @@ -525,6 +520,7 @@ func (f *clientPool) setCapacity(c *clientInfo, capacity uint64) error {
}
totalConnectedGauge.Update(int64(f.connectedCap))
f.priorityConnected += capacity - oldCapacity
c.updatePriceFactors()
c.peer.updateCapacity(c.capacity)
return nil
}
Expand Down Expand Up @@ -585,9 +581,7 @@ func (f *clientPool) updateBalance(id enode.ID, amount int64, meta string) error
// but we have no idea about the new capacity, need a second
// call to udpate it.
c.priority = true
if c.priority {
f.priorityConnected += c.capacity
}
f.priorityConnected += c.capacity
c.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) })
}
c.balanceMetaInfo = meta
Expand Down

0 comments on commit 9117084

Please sign in to comment.