From b173398a739630f28178e9224812b311512bf7fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Wed, 13 Nov 2019 23:47:03 +0100 Subject: [PATCH] les: implement server priority API (#20070) This PR implements the LES server RPC API. Methods for server capacity, client balance and client priority management are provided. --- internal/web3ext/web3ext.go | 34 +++++ les/api.go | 281 +++++++++++++++++++++++++++++++++- les/balance.go | 8 + les/clientpool.go | 296 +++++++++++++++++++++++++++--------- les/clientpool_test.go | 54 +++---- les/server.go | 31 ++-- 6 files changed, 597 insertions(+), 107 deletions(-) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 86e575439294..ac6acc799c8e 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -445,6 +445,11 @@ web3._extend({ params: 2, inputFormatter:[null, null], }), + new web3._extend.Method({ + name: 'freezeClient', + call: 'debug_freezeClient', + params: 1, + }), ], properties: [] }); @@ -798,6 +803,31 @@ web3._extend({ call: 'les_getCheckpoint', params: 1 }), + new web3._extend.Method({ + name: 'clientInfo', + call: 'les_clientInfo', + params: 1 + }), + new web3._extend.Method({ + name: 'priorityClientInfo', + call: 'les_priorityClientInfo', + params: 3 + }), + new web3._extend.Method({ + name: 'setClientParams', + call: 'les_setClientParams', + params: 2 + }), + new web3._extend.Method({ + name: 'setDefaultParams', + call: 'les_setDefaultParams', + params: 1 + }), + new web3._extend.Method({ + name: 'updateBalance', + call: 'les_updateBalance', + params: 3 + }), ], properties: [ @@ -809,6 +839,10 @@ web3._extend({ name: 'checkpointContractAddress', getter: 'les_getCheckpointContractAddress' }), + new web3._extend.Property({ + name: 'serverInfo', + getter: 'les_serverInfo' + }), ] }); ` diff --git a/les/api.go b/les/api.go index bbef771f0445..c417b4ec02d8 100644 --- a/les/api.go +++ b/les/api.go @@ -18,15 +18,292 @@ package les import ( "errors" + "fmt" + "math" + "time" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/p2p/enode" ) var ( - errNoCheckpoint = errors.New("no local checkpoint provided") - errNotActivated = errors.New("checkpoint registrar is not activated") + errNoCheckpoint = errors.New("no local checkpoint provided") + errNotActivated = errors.New("checkpoint registrar is not activated") + errUnknownBenchmarkType = errors.New("unknown benchmark type") + errBalanceOverflow = errors.New("balance overflow") + errNoPriority = errors.New("priority too low to raise capacity") ) +const maxBalance = math.MaxInt64 + +// PrivateLightServerAPI provides an API to access the LES light server. +type PrivateLightServerAPI struct { + server *LesServer + defaultPosFactors, defaultNegFactors priceFactors +} + +// NewPrivateLightServerAPI creates a new LES light server API. +func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI { + return &PrivateLightServerAPI{ + server: server, + defaultPosFactors: server.clientPool.defaultPosFactors, + defaultNegFactors: server.clientPool.defaultNegFactors, + } +} + +// ServerInfo returns global server parameters +func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} { + res := make(map[string]interface{}) + res["minimumCapacity"] = api.server.minCapacity + res["maximumCapacity"] = api.server.maxCapacity + res["freeClientCapacity"] = api.server.freeCapacity + res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo() + return res +} + +// ClientInfo returns information about clients listed in the ids list or matching the given tags +func (api *PrivateLightServerAPI) ClientInfo(ids []enode.ID) map[enode.ID]map[string]interface{} { + res := make(map[enode.ID]map[string]interface{}) + api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error { + res[id] = api.clientInfo(client, id) + return nil + }) + return res +} + +// PriorityClientInfo returns information about clients with a positive balance +// in the given ID range (stop excluded). If stop is null then the iterator stops +// only at the end of the ID space. MaxCount limits the number of results returned. +// If maxCount limit is applied but there are more potential results then the ID +// of the next potential result is included in the map with an empty structure +// assigned to it. +func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} { + res := make(map[enode.ID]map[string]interface{}) + ids := api.server.clientPool.ndb.getPosBalanceIDs(start, stop, maxCount+1) + if len(ids) > maxCount { + res[ids[maxCount]] = make(map[string]interface{}) + ids = ids[:maxCount] + } + if len(ids) != 0 { + api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error { + res[id] = api.clientInfo(client, id) + return nil + }) + } + return res +} + +// clientInfo creates a client info data structure +func (api *PrivateLightServerAPI) clientInfo(c *clientInfo, id enode.ID) map[string]interface{} { + info := make(map[string]interface{}) + if c != nil { + now := mclock.Now() + info["isConnected"] = true + info["connectionTime"] = float64(now-c.connectedAt) / float64(time.Second) + info["capacity"] = c.capacity + pb, nb := c.balanceTracker.getBalance(now) + info["pricing/balance"], info["pricing/negBalance"] = pb, nb + info["pricing/balanceMeta"] = c.balanceMetaInfo + info["priority"] = pb != 0 + } else { + info["isConnected"] = false + pb := api.server.clientPool.getPosBalance(id) + info["pricing/balance"], info["pricing/balanceMeta"] = pb.value, pb.meta + info["priority"] = pb.value != 0 + } + return info +} + +// setParams either sets the given parameters for a single connected client (if specified) +// or the default parameters applicable to clients connected in the future +func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *priceFactors) (updateFactors bool, err error) { + defParams := client == nil + if !defParams { + posFactors, negFactors = &client.posFactors, &client.negFactors + } + for name, value := range params { + errValue := func() error { + return fmt.Errorf("invalid value for parameter '%s'", name) + } + setFactor := func(v *float64) { + if val, ok := value.(float64); ok && val >= 0 { + *v = val / float64(time.Second) + updateFactors = true + } else { + err = errValue() + } + } + + switch { + case name == "pricing/timeFactor": + setFactor(&posFactors.timeFactor) + case name == "pricing/capacityFactor": + setFactor(&posFactors.capacityFactor) + case name == "pricing/requestCostFactor": + setFactor(&posFactors.requestFactor) + case name == "pricing/negative/timeFactor": + setFactor(&negFactors.timeFactor) + case name == "pricing/negative/capacityFactor": + setFactor(&negFactors.capacityFactor) + case name == "pricing/negative/requestCostFactor": + setFactor(&negFactors.requestFactor) + case !defParams && name == "capacity": + if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity { + err = api.server.clientPool.setCapacity(client, uint64(capacity)) + // Don't have to call factor update explicitly. It's already done + // in setCapacity function. + } else { + err = errValue() + } + default: + if defParams { + err = fmt.Errorf("invalid default parameter '%s'", name) + } else { + err = fmt.Errorf("invalid client parameter '%s'", name) + } + } + if err != nil { + return + } + } + return +} + +// UpdateBalance updates the balance of a client (either overwrites it or adds to it). +// It also updates the balance meta info string. +func (api *PrivateLightServerAPI) UpdateBalance(id enode.ID, value int64, meta string) (map[string]uint64, error) { + oldBalance, newBalance, err := api.server.clientPool.updateBalance(id, value, meta) + m := make(map[string]uint64) + m["old"] = oldBalance + m["new"] = newBalance + return m, err +} + +// SetClientParams sets client parameters for all clients listed in the ids list +// or all connected clients if the list is empty +func (api *PrivateLightServerAPI) SetClientParams(ids []enode.ID, params map[string]interface{}) error { + return api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error { + if client != nil { + update, err := api.setParams(params, client, nil, nil) + if update { + client.updatePriceFactors() + } + return err + } else { + return fmt.Errorf("client %064x is not connected", id[:]) + } + }) +} + +// SetDefaultParams sets the default parameters applicable to clients connected in the future +func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{}) error { + update, err := api.setParams(params, nil, &api.defaultPosFactors, &api.defaultNegFactors) + if update { + api.server.clientPool.setDefaultFactors(api.defaultPosFactors, api.defaultNegFactors) + } + return err +} + +// Benchmark runs a request performance benchmark with a given set of measurement setups +// in multiple passes specified by passCount. The measurement time for each setup in each +// pass is specified in milliseconds by length. +// +// Note: measurement time is adjusted for each pass depending on the previous ones. +// Therefore a controlled total measurement time is achievable in multiple passes. +func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) { + benchmarks := make([]requestBenchmark, len(setups)) + for i, setup := range setups { + if t, ok := setup["type"].(string); ok { + getInt := func(field string, def int) int { + if value, ok := setup[field].(float64); ok { + return int(value) + } + return def + } + getBool := func(field string, def bool) bool { + if value, ok := setup[field].(bool); ok { + return value + } + return def + } + switch t { + case "header": + benchmarks[i] = &benchmarkBlockHeaders{ + amount: getInt("amount", 1), + skip: getInt("skip", 1), + byHash: getBool("byHash", false), + reverse: getBool("reverse", false), + } + case "body": + benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false} + case "receipts": + benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true} + case "proof": + benchmarks[i] = &benchmarkProofsOrCode{code: false} + case "code": + benchmarks[i] = &benchmarkProofsOrCode{code: true} + case "cht": + benchmarks[i] = &benchmarkHelperTrie{ + bloom: false, + reqCount: getInt("amount", 1), + } + case "bloom": + benchmarks[i] = &benchmarkHelperTrie{ + bloom: true, + reqCount: getInt("amount", 1), + } + case "txSend": + benchmarks[i] = &benchmarkTxSend{} + case "txStatus": + benchmarks[i] = &benchmarkTxStatus{} + default: + return nil, errUnknownBenchmarkType + } + } else { + return nil, errUnknownBenchmarkType + } + } + rs := api.server.handler.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length)) + result := make([]map[string]interface{}, len(setups)) + for i, r := range rs { + res := make(map[string]interface{}) + if r.err == nil { + res["totalCount"] = r.totalCount + res["avgTime"] = r.avgTime + res["maxInSize"] = r.maxInSize + res["maxOutSize"] = r.maxOutSize + } else { + res["error"] = r.err.Error() + } + result[i] = res + } + return result, nil +} + +// PrivateDebugAPI provides an API to debug LES light server functionality. +type PrivateDebugAPI struct { + server *LesServer +} + +// NewPrivateDebugAPI creates a new LES light server debug API. +func NewPrivateDebugAPI(server *LesServer) *PrivateDebugAPI { + return &PrivateDebugAPI{ + server: server, + } +} + +// FreezeClient forces a temporary client freeze which normally happens when the server is overloaded +func (api *PrivateDebugAPI) FreezeClient(id enode.ID) error { + return api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo, id enode.ID) error { + if c == nil { + return fmt.Errorf("client %064x is not connected", id[:]) + } + c.peer.freezeClient() + return nil + }) +} + // PrivateLightAPI provides an API to access the LES light server or light client. type PrivateLightAPI struct { backend *lesCommons diff --git a/les/balance.go b/les/balance.go index 2813db01c5c1..6d1b20c0156b 100644 --- a/les/balance.go +++ b/les/balance.go @@ -160,6 +160,14 @@ func (bt *balanceTracker) timeUntil(priority int64) (time.Duration, bool) { return time.Duration(dt), true } +// setCapacity updates the capacity value used for priority calculation +func (bt *balanceTracker) setCapacity(capacity uint64) { + bt.lock.Lock() + defer bt.lock.Unlock() + + bt.capacity = capacity +} + // getPriority returns the actual priority based on the current balance func (bt *balanceTracker) getPriority(now mclock.AbsTime) int64 { bt.lock.Lock() diff --git a/les/clientpool.go b/les/clientpool.go index 0b4d1b9612e6..0169dc706b99 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -17,7 +17,9 @@ package les import ( + "bytes" "encoding/binary" + "fmt" "io" "math" "sync" @@ -83,15 +85,16 @@ type clientPool struct { connectedMap map[enode.ID]*clientInfo connectedQueue *prque.LazyQueue - posFactors, negFactors priceFactors + defaultPosFactors, defaultNegFactors priceFactors - connLimit int // The maximum number of connections that clientpool can support - capLimit uint64 // The maximum cumulative capacity that clientpool can support - connectedCap uint64 // The sum of the capacity of the current clientpool connected - freeClientCap uint64 // The capacity value of each free client - startTime mclock.AbsTime // The timestamp at which the clientpool started running - cumulativeTime int64 // The cumulative running time of clientpool at the start point. - disableBias bool // Disable connection bias(used in testing) + connLimit int // The maximum number of connections that clientpool can support + capLimit uint64 // The maximum cumulative capacity that clientpool can support + connectedCap uint64 // The sum of the capacity of the current clientpool connected + priorityConnected uint64 // The sum of the capacity of currently connected priority clients + freeClientCap uint64 // The capacity value of each free client + startTime mclock.AbsTime // The timestamp at which the clientpool started running + cumulativeTime int64 // The cumulative running time of clientpool at the start point. + disableBias bool // Disable connection bias(used in testing) } // clientPeer represents a client in the pool. @@ -103,18 +106,22 @@ type clientPeer interface { ID() enode.ID freeClientId() string updateCapacity(uint64) + freezeClient() } // clientInfo represents a connected client type clientInfo struct { - address string - id enode.ID - capacity uint64 - priority bool - pool *clientPool - peer clientPeer - queueIndex int // position in connectedQueue - balanceTracker balanceTracker + address string + id enode.ID + connectedAt mclock.AbsTime + capacity uint64 + priority bool + pool *clientPool + peer clientPeer + queueIndex int // position in connectedQueue + balanceTracker balanceTracker + posFactors, negFactors priceFactors + balanceMetaInfo string } // connSetIndex callback updates clientInfo item index in connectedQueue @@ -223,20 +230,26 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool { ) pb := f.ndb.getOrNewPB(id) posBalance = pb.value - e := &clientInfo{pool: f, peer: peer, address: freeID, queueIndex: -1, id: id, priority: posBalance != 0} nb := f.ndb.getOrNewNB(freeID) if nb.logValue != 0 { - negBalance = uint64(math.Exp(float64(nb.logValue-f.logOffset(now)) / fixedPointMultiplier)) - negBalance *= uint64(time.Second) + negBalance = uint64(math.Exp(float64(nb.logValue-f.logOffset(now))/fixedPointMultiplier) * float64(time.Second)) + } + e := &clientInfo{ + pool: f, + peer: peer, + address: freeID, + queueIndex: -1, + id: id, + connectedAt: now, + priority: posBalance != 0, + posFactors: f.defaultPosFactors, + negFactors: f.defaultNegFactors, + balanceMetaInfo: pb.meta, } // If the client is a free client, assign with a low free capacity, // Otherwise assign with the given value(priority client) - if !e.priority { - capacity = f.freeClientCap - } - // Ensure the capacity will never lower than the free capacity. - if capacity < f.freeClientCap { + if !e.priority || capacity == 0 { capacity = f.freeClientCap } e.capacity = capacity @@ -244,7 +257,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool { // Starts a balance tracker e.balanceTracker.init(f.clock, capacity) e.balanceTracker.setBalance(posBalance, negBalance) - f.setClientPriceFactors(e) + e.updatePriceFactors() // If the number of clients already connected in the clientpool exceeds its // capacity, evict some clients with lowest priority. @@ -283,6 +296,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool { f.dropClient(c, now, true) } } + // Register new client to connection queue. f.connectedMap[id] = e f.connectedQueue.Push(e) @@ -291,6 +305,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool { // If the current client is a paid client, monitor the status of client, // downgrade it to normal client if positive balance is used up. if e.priority { + f.priorityConnected += capacity e.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) }) } // If the capacity of client is not the default value(free capacity), notify @@ -324,6 +339,38 @@ func (f *clientPool) disconnect(p clientPeer) { f.dropClient(e, f.clock.Now(), false) } +// forClients iterates through a list of clients, calling the callback for each one. +// If a client is not connected then clientInfo is nil. If the specified list is empty +// then the callback is called for all connected clients. +func (f *clientPool) forClients(ids []enode.ID, callback func(*clientInfo, enode.ID) error) error { + f.lock.Lock() + defer f.lock.Unlock() + + if len(ids) > 0 { + for _, id := range ids { + if err := callback(f.connectedMap[id], id); err != nil { + return err + } + } + } else { + for _, c := range f.connectedMap { + if err := callback(c, c.id); err != nil { + return err + } + } + } + return nil +} + +// setDefaultFactors sets the default price factors applied to subsequently connected clients +func (f *clientPool) setDefaultFactors(posFactors, negFactors priceFactors) { + f.lock.Lock() + defer f.lock.Unlock() + + f.defaultPosFactors = posFactors + f.defaultNegFactors = negFactors +} + // dropClient removes a client from the connected queue and finalizes its balance. // If kick is true then it also initiates the disconnection. func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) { @@ -334,6 +381,9 @@ func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) { f.connectedQueue.Remove(e.queueIndex) delete(f.connectedMap, e.id) f.connectedCap -= e.capacity + if e.priority { + f.priorityConnected -= e.capacity + } totalConnectedGauge.Update(int64(f.connectedCap)) if kick { clientKickedMeter.Mark(1) @@ -345,6 +395,15 @@ func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) { } } +// capacityInfo returns the total capacity allowance, the total capacity of connected +// clients and the total capacity of connected and prioritized clients +func (f *clientPool) capacityInfo() (uint64, uint64, uint64) { + f.lock.Lock() + defer f.lock.Unlock() + + return f.capLimit, f.connectedCap, f.priorityConnected +} + // finalizeBalance stops the balance tracker, retrieves the final balances and // stores them in posBalanceQueue and negBalanceQueue func (f *clientPool) finalizeBalance(c *clientInfo, now mclock.AbsTime) { @@ -374,14 +433,20 @@ func (f *clientPool) balanceExhausted(id enode.ID) { if c == nil || !c.priority { return } + if c.priority { + f.priorityConnected -= c.capacity + } c.priority = false if c.capacity != f.freeClientCap { f.connectedCap += f.freeClientCap - c.capacity totalConnectedGauge.Update(int64(f.connectedCap)) c.capacity = f.freeClientCap + c.balanceTracker.setCapacity(c.capacity) c.peer.updateCapacity(c.capacity) } - f.ndb.delPB(id) + pb := f.ndb.getOrNewPB(id) + pb.value = 0 + f.ndb.setPB(id, pb) } // setConnLimit sets the maximum number and total capacity of connected clients, @@ -400,6 +465,56 @@ func (f *clientPool) setLimits(totalConn int, totalCap uint64) { } } +// setCapacity sets the assigned capacity of a connected client +func (f *clientPool) setCapacity(c *clientInfo, capacity uint64) error { + if f.connectedMap[c.id] != c { + return fmt.Errorf("client %064x is not connected", c.id[:]) + } + if c.capacity == capacity { + return nil + } + if !c.priority { + return errNoPriority + } + oldCapacity := c.capacity + c.capacity = capacity + f.connectedCap += capacity - oldCapacity + c.balanceTracker.setCapacity(capacity) + f.connectedQueue.Update(c.queueIndex) + if f.connectedCap > f.capLimit { + var kickList []*clientInfo + kick := true + f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool { + client := data.(*clientInfo) + kickList = append(kickList, client) + f.connectedCap -= client.capacity + if client == c { + kick = false + } + return kick && (f.connectedCap > f.capLimit) + }) + if kick { + now := mclock.Now() + for _, c := range kickList { + f.dropClient(c, now, true) + } + } else { + c.capacity = oldCapacity + c.balanceTracker.setCapacity(oldCapacity) + for _, c := range kickList { + f.connectedCap += c.capacity + f.connectedQueue.Push(c) + } + return errNoPriority + } + } + totalConnectedGauge.Update(int64(f.connectedCap)) + f.priorityConnected += capacity - oldCapacity + c.updatePriceFactors() + c.peer.updateCapacity(c.capacity) + return nil +} + // requestCost feeds request cost after serving a request from the given peer. func (f *clientPool) requestCost(p *peer, cost uint64) { f.lock.Lock() @@ -424,83 +539,86 @@ func (f *clientPool) logOffset(now mclock.AbsTime) int64 { return f.cumulativeTime + cumulativeTime } -// setPriceFactors changes pricing factors for both positive and negative balances. -// Applies to connected clients and also future connections. -func (f *clientPool) setPriceFactors(posFactors, negFactors priceFactors) { +// setClientPriceFactors sets the pricing factors for an individual connected client +func (c *clientInfo) updatePriceFactors() { + c.balanceTracker.setFactors(true, c.negFactors.timeFactor+float64(c.capacity)*c.negFactors.capacityFactor/1000000, c.negFactors.requestFactor) + c.balanceTracker.setFactors(false, c.posFactors.timeFactor+float64(c.capacity)*c.posFactors.capacityFactor/1000000, c.posFactors.requestFactor) +} + +// getPosBalance retrieves a single positive balance entry from cache or the database +func (f *clientPool) getPosBalance(id enode.ID) posBalance { f.lock.Lock() defer f.lock.Unlock() - f.posFactors, f.negFactors = posFactors, negFactors - for _, c := range f.connectedMap { - f.setClientPriceFactors(c) - } -} - -// setClientPriceFactors sets the pricing factors for an individual connected client -func (f *clientPool) setClientPriceFactors(c *clientInfo) { - c.balanceTracker.setFactors(true, f.negFactors.timeFactor+float64(c.capacity)*f.negFactors.capacityFactor/1000000, f.negFactors.requestFactor) - c.balanceTracker.setFactors(false, f.posFactors.timeFactor+float64(c.capacity)*f.posFactors.capacityFactor/1000000, f.posFactors.requestFactor) + return f.ndb.getOrNewPB(id) } -// addBalance updates the positive balance of a client. -// If setTotal is false then the given amount is added to the balance. -// If setTotal is true then amount represents the total amount ever added to the -// given ID and positive balance is increased by (amount-lastTotal) while lastTotal -// is updated to amount. This method also allows removing positive balance. -func (f *clientPool) addBalance(id enode.ID, amount uint64, setTotal bool) { +// updateBalance updates the balance of a client (either overwrites it or adds to it). +// It also updates the balance meta info string. +func (f *clientPool) updateBalance(id enode.ID, amount int64, meta string) (uint64, uint64, error) { f.lock.Lock() defer f.lock.Unlock() pb := f.ndb.getOrNewPB(id) + var negBalance uint64 c := f.connectedMap[id] if c != nil { - posBalance, negBalance := c.balanceTracker.getBalance(f.clock.Now()) - pb.value = posBalance - defer func() { - c.balanceTracker.setBalance(pb.value, negBalance) - if !c.priority && pb.value > 0 { - // The capacity should be adjusted based on the requirement, - // but we have no idea about the new capacity, need a second - // call to udpate it. - c.priority = true - c.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) }) - } - }() + pb.value, negBalance = c.balanceTracker.getBalance(f.clock.Now()) } - if setTotal { - if pb.value+amount > pb.lastTotal { - pb.value += amount - pb.lastTotal - } else { - pb.value = 0 + oldBalance := pb.value + if amount > 0 { + if amount > maxBalance || pb.value > maxBalance-uint64(amount) { + return oldBalance, oldBalance, errBalanceOverflow } - pb.lastTotal = amount + pb.value += uint64(amount) } else { - pb.value += amount - pb.lastTotal += amount + if uint64(-amount) > pb.value { + pb.value = 0 + } else { + pb.value -= uint64(-amount) + } } + pb.meta = meta f.ndb.setPB(id, pb) + if c != nil { + c.balanceTracker.setBalance(pb.value, negBalance) + if !c.priority && pb.value > 0 { + // The capacity should be adjusted based on the requirement, + // but we have no idea about the new capacity, need a second + // call to udpate it. + c.priority = true + f.priorityConnected += c.capacity + c.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) }) + } + // if balance is set to zero then reverting to non-priority status + // is handled by the balanceExhausted callback + c.balanceMetaInfo = meta + } + return oldBalance, pb.value, nil } // posBalance represents a recently accessed positive balance entry type posBalance struct { - value, lastTotal uint64 + value uint64 + meta string } // EncodeRLP implements rlp.Encoder func (e *posBalance) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{e.value, e.lastTotal}) + return rlp.Encode(w, []interface{}{e.value, e.meta}) } // DecodeRLP implements rlp.Decoder func (e *posBalance) DecodeRLP(s *rlp.Stream) error { var entry struct { - Value, LastTotal uint64 + Value uint64 + Meta string } if err := s.Decode(&entry); err != nil { return err } e.value = entry.Value - e.lastTotal = entry.LastTotal + e.meta = entry.Meta return nil } @@ -526,7 +644,10 @@ func (e *negBalance) DecodeRLP(s *rlp.Stream) error { const ( // nodeDBVersion is the version identifier of the node data in db - nodeDBVersion = 0 + // + // Changelog: + // * Replace `lastTotal` with `meta` in positive balance: version 0=>1 + nodeDBVersion = 1 // dbCleanupCycle is the cycle of db for useless data cleanup dbCleanupCycle = time.Hour @@ -614,6 +735,10 @@ func (db *nodeDB) getOrNewPB(id enode.ID) posBalance { } func (db *nodeDB) setPB(id enode.ID, b posBalance) { + if b.value == 0 && len(b.meta) == 0 { + db.delPB(id) + return + } key := db.key(id.Bytes(), false) enc, err := rlp.EncodeToBytes(&(b)) if err != nil { @@ -630,6 +755,37 @@ func (db *nodeDB) delPB(id enode.ID) { db.pcache.Remove(string(key)) } +// getPosBalanceIDs returns a lexicographically ordered list of IDs of accounts +// with a positive balance +func (db *nodeDB) getPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) { + if maxCount <= 0 { + return + } + it := db.db.NewIteratorWithStart(db.key(start.Bytes(), false)) + defer it.Release() + for i := len(stop[:]) - 1; i >= 0; i-- { + stop[i]-- + if stop[i] != 255 { + break + } + } + stopKey := db.key(stop.Bytes(), false) + keyLen := len(stopKey) + + for it.Next() { + var id enode.ID + if len(it.Key()) != keyLen || bytes.Compare(it.Key(), stopKey) == 1 { + return + } + copy(id[:], it.Key()[keyLen-len(id):]) + result = append(result, id) + if len(result) == maxCount { + return + } + } + return +} + func (db *nodeDB) getOrNewNB(id string) negBalance { key := db.key([]byte(id), true) item, exist := db.ncache.Get(string(key)) diff --git a/les/clientpool_test.go b/les/clientpool_test.go index 53973696caff..8b990e1dc2b8 100644 --- a/les/clientpool_test.go +++ b/les/clientpool_test.go @@ -76,6 +76,8 @@ type poolTestPeerWithCap struct { func (i *poolTestPeerWithCap) updateCapacity(cap uint64) { i.cap = cap } +func (i poolTestPeer) freezeClient() {} + func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomDisconnect bool) { rand.Seed(time.Now().UnixNano()) var ( @@ -91,7 +93,7 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD ) pool.disableBias = true pool.setLimits(connLimit, uint64(connLimit)) - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) // pool should accept new peers up to its connected limit for i := 0; i < connLimit; i++ { @@ -107,9 +109,9 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD if tickCounter == testClientPoolTicks/4 { // give a positive balance to some of the peers - amount := uint64(testClientPoolTicks / 2 * 1000000000) // enough for half of the simulation period + amount := testClientPoolTicks / 2 * int64(time.Second) // enough for half of the simulation period for i := 0; i < paidCount; i++ { - pool.addBalance(poolTestPeer(i).ID(), amount, false) + pool.updateBalance(poolTestPeer(i).ID(), amount, "") } } @@ -173,10 +175,10 @@ func TestConnectPaidClient(t *testing.T) { pool := newClientPool(db, 1, &clock, nil) defer pool.stop() pool.setLimits(10, uint64(10)) - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) // Add balance for an external client and mark it as paid client - pool.addBalance(poolTestPeer(0).ID(), 1000, false) + pool.updateBalance(poolTestPeer(0).ID(), 1000, "") if !pool.connect(poolTestPeer(0), 10) { t.Fatalf("Failed to connect paid client") @@ -191,10 +193,10 @@ func TestConnectPaidClientToSmallPool(t *testing.T) { pool := newClientPool(db, 1, &clock, nil) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) // Add balance for an external client and mark it as paid client - pool.addBalance(poolTestPeer(0).ID(), 1000, false) + pool.updateBalance(poolTestPeer(0).ID(), 1000, "") // Connect a fat paid client to pool, should reject it. if pool.connect(poolTestPeer(0), 100) { @@ -211,18 +213,18 @@ func TestConnectPaidClientToFullPool(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) for i := 0; i < 10; i++ { - pool.addBalance(poolTestPeer(i).ID(), 1000000000, false) + pool.updateBalance(poolTestPeer(i).ID(), 1000000000, "") pool.connect(poolTestPeer(i), 1) } - pool.addBalance(poolTestPeer(11).ID(), 1000, false) // Add low balance to new paid client + pool.updateBalance(poolTestPeer(11).ID(), 1000, "") // Add low balance to new paid client if pool.connect(poolTestPeer(11), 1) { t.Fatalf("Low balance paid client should be rejected") } clock.Run(time.Second) - pool.addBalance(poolTestPeer(12).ID(), 1000000000*60*3, false) // Add high balance to new paid client + pool.updateBalance(poolTestPeer(12).ID(), 1000000000*60*3, "") // Add high balance to new paid client if !pool.connect(poolTestPeer(12), 1) { t.Fatalf("High balance paid client should be accpected") } @@ -238,10 +240,10 @@ func TestPaidClientKickedOut(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) for i := 0; i < 10; i++ { - pool.addBalance(poolTestPeer(i).ID(), 1000000000, false) // 1 second allowance + pool.updateBalance(poolTestPeer(i).ID(), 1000000000, "") // 1 second allowance pool.connect(poolTestPeer(i), 1) clock.Run(time.Millisecond) } @@ -268,7 +270,7 @@ func TestConnectFreeClient(t *testing.T) { pool := newClientPool(db, 1, &clock, nil) defer pool.stop() pool.setLimits(10, uint64(10)) - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) if !pool.connect(poolTestPeer(0), 10) { t.Fatalf("Failed to connect free client") } @@ -283,7 +285,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) for i := 0; i < 10; i++ { pool.connect(poolTestPeer(i), 1) @@ -312,7 +314,7 @@ func TestFreeClientKickedOut(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) for i := 0; i < 10; i++ { pool.connect(poolTestPeer(i), 1) @@ -347,9 +349,9 @@ func TestPositiveBalanceCalculation(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) - pool.addBalance(poolTestPeer(0).ID(), uint64(time.Minute*3), false) + pool.updateBalance(poolTestPeer(0).ID(), int64(time.Minute*3), "") pool.connect(poolTestPeer(0), 10) clock.Run(time.Minute) @@ -370,12 +372,12 @@ func TestDowngradePriorityClient(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) p := &poolTestPeerWithCap{ poolTestPeer: poolTestPeer(0), } - pool.addBalance(p.ID(), uint64(time.Minute), false) + pool.updateBalance(p.ID(), int64(time.Minute), "") pool.connect(p, 10) if p.cap != 10 { t.Fatalf("The capcacity of priority peer hasn't been updated, got: %d", p.cap) @@ -391,7 +393,7 @@ func TestDowngradePriorityClient(t *testing.T) { t.Fatalf("Positive balance mismatch, want %v, got %v", 0, pb.value) } - pool.addBalance(poolTestPeer(0).ID(), uint64(time.Minute), false) + pool.updateBalance(poolTestPeer(0).ID(), int64(time.Minute), "") pb = pool.ndb.getOrNewPB(poolTestPeer(0).ID()) if pb.value != uint64(time.Minute) { t.Fatalf("Positive balance mismatch, want %v, got %v", uint64(time.Minute), pb.value) @@ -408,7 +410,7 @@ func TestNegativeBalanceCalculation(t *testing.T) { pool := newClientPool(db, 1, &clock, removeFn) defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 - pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) + pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) for i := 0; i < 10; i++ { pool.connect(poolTestPeer(i), 1) @@ -442,8 +444,8 @@ func TestNodeDB(t *testing.T) { ndb := newNodeDB(rawdb.NewMemoryDatabase(), mclock.System{}) defer ndb.close() - if !bytes.Equal(ndb.verbuf[:], []byte{0x00, 0x00}) { - t.Fatalf("version buffer mismatch, want %v, got %v", []byte{0x00, 0x00}, ndb.verbuf) + if !bytes.Equal(ndb.verbuf[:], []byte{0x00, nodeDBVersion}) { + t.Fatalf("version buffer mismatch, want %v, got %v", []byte{0x00, nodeDBVersion}, ndb.verbuf) } var cases = []struct { id enode.ID @@ -451,8 +453,8 @@ func TestNodeDB(t *testing.T) { balance interface{} positive bool }{ - {enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 100, lastTotal: 200}, true}, - {enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 200, lastTotal: 300}, true}, + {enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 100}, true}, + {enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 200}, true}, {enode.ID{}, "127.0.0.1", negBalance{logValue: 10}, false}, {enode.ID{}, "127.0.0.1", negBalance{logValue: 20}, false}, } diff --git a/les/server.go b/les/server.go index 997a24191bf5..e68903dd81da 100644 --- a/les/server.go +++ b/les/server.go @@ -50,9 +50,9 @@ type LesServer struct { servingQueue *servingQueue clientPool *clientPool - freeCapacity uint64 // The minimal client capacity used for free client. - threadsIdle int // Request serving threads count when system is idle. - threadsBusy int // Request serving threads count when system is busy(block insertion). + minCapacity, maxCapacity, freeCapacity uint64 + threadsIdle int // Request serving threads count when system is idle. + threadsBusy int // Request serving threads count when system is busy(block insertion). } func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { @@ -88,7 +88,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { threadsIdle: threads, } srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), e.Synced) - srv.costTracker, srv.freeCapacity = newCostTracker(e.ChainDb(), config) + srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config) + srv.freeCapacity = srv.minCapacity // Set up checkpoint oracle. oracle := config.CheckpointOracle @@ -108,13 +109,13 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { // to send requests most of the time. Our goal is to serve as many clients as // possible while the actually used server capacity does not exceed the limits totalRecharge := srv.costTracker.totalRecharge() - maxCapacity := srv.freeCapacity * uint64(srv.config.LightPeers) - if totalRecharge > maxCapacity { - maxCapacity = totalRecharge + srv.maxCapacity = srv.freeCapacity * uint64(srv.config.LightPeers) + if totalRecharge > srv.maxCapacity { + srv.maxCapacity = totalRecharge } - srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2) + srv.fcManager.SetCapacityLimits(srv.freeCapacity, srv.maxCapacity, srv.freeCapacity*2) srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) }) - srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1}) + srv.clientPool.setDefaultFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1}) checkpoint := srv.latestLocalCheckpoint() if !checkpoint.Empty() { @@ -133,6 +134,18 @@ func (s *LesServer) APIs() []rpc.API { Service: NewPrivateLightAPI(&s.lesCommons), Public: false, }, + { + Namespace: "les", + Version: "1.0", + Service: NewPrivateLightServerAPI(s), + Public: false, + }, + { + Namespace: "debug", + Version: "1.0", + Service: NewPrivateDebugAPI(s), + Public: false, + }, } }