Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Adjust requests costs for light client #9925

Merged
merged 11 commits into from
Nov 21, 2018
88 changes: 80 additions & 8 deletions ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parking_lot::{Mutex, RwLock};
use provider::Provider;
use request::{Request, NetworkRequests as Requests, Response};
use rlp::{RlpStream, Rlp};
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::ops::{BitOr, BitAnd, Not};
use std::sync::Arc;
Expand All @@ -38,7 +38,7 @@ use std::time::{Duration, Instant};
use self::request_credits::{Credits, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
use self::load_timer::{LoadDistribution, NullStore};
use self::load_timer::{LoadDistribution, NullStore, MOVING_SAMPLE_SIZE};
use self::request_set::RequestSet;
use self::id_guard::IdGuard;

Expand Down Expand Up @@ -70,6 +70,16 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);

const STATISTICS_TIMEOUT: TimerToken = 4;
const STATISTICS_INTERVAL: Duration = Duration::from_secs(15);

/// Maximum load share for the light server
pub const MAX_LIGHTSERV_LOAD: f64 = 0.5;

/// Factor to multiply leecher count to cater for
/// extra sudden connections (should be >= 1.0)
pub const LEECHER_COUNT_FACTOR: f64 = 1.25;

// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);

Expand Down Expand Up @@ -256,18 +266,18 @@ pub trait Handler: Send + Sync {
pub struct Config {
/// How many stored seconds of credits peers should be able to accumulate.
pub max_stored_seconds: u64,
/// How much of the total load capacity each peer should be allowed to take.
pub load_share: f64,
/// The network config median peers (used as default peer count)
pub median_peers: f64,
}

impl Default for Config {
fn default() -> Self {
const LOAD_SHARE: f64 = 1.0 / 25.0;
const MEDIAN_PEERS: f64 = 25.0;
const MAX_ACCUMULATED: u64 = 60 * 5; // only charge for 5 minutes.

Config {
max_stored_seconds: MAX_ACCUMULATED,
load_share: LOAD_SHARE,
median_peers: MEDIAN_PEERS,
}
}
}
Expand Down Expand Up @@ -335,6 +345,42 @@ mod id_guard {
}
}

/// Provides various statistics that could
/// be used to compute costs
pub struct Statistics {
/// Samples of peer count
peer_counts: VecDeque<usize>,
}

impl Statistics {
/// Create a new Statistics instance
pub fn new() -> Self {
Statistics {
peer_counts: VecDeque::with_capacity(MOVING_SAMPLE_SIZE),
}
}

/// Add a new peer_count sample
pub fn add_peer_count(&mut self, peer_count: usize) {
while self.peer_counts.len() >= MOVING_SAMPLE_SIZE {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be simplified by using a fixed size array/vector (initialized wit 1s) and index: usize that wraps after the end

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, but the current implementation is more accurate in case when the number of samples is less than MOVING_SAMPLE_SIZE (it would be full only after an hour (256 * 15 / 60)).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, although that could be simple solved with a flag (i.e. is_fully_filled ? 0..len : 0..index).

self.peer_counts.pop_front();
}
self.peer_counts.push_back(peer_count);
}

/// Get the average peer count from previous samples. Is always >= 1.0
pub fn avg_peer_count(&self) -> f64 {
let len = self.peer_counts.len();
if len == 0 {
return 1.0;
}
let avg = self.peer_counts.iter()
.fold(0, |sum: u32, &v| sum.saturating_add(v as u32)) as f64
/ len as f64;
avg.max(1.0)
}
}

/// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network.
///
Expand All @@ -359,6 +405,7 @@ pub struct LightProtocol {
req_id: AtomicUsize,
sample_store: Box<SampleStore>,
load_distribution: LoadDistribution,
statistics: RwLock<Statistics>,
}

impl LightProtocol {
Expand All @@ -369,9 +416,11 @@ impl LightProtocol {
let genesis_hash = provider.chain_info().genesis_hash;
let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore));
let load_distribution = LoadDistribution::load(&*sample_store);
// Default load share relative to median peers
let load_share = MAX_LIGHTSERV_LOAD / params.config.median_peers;
let flow_params = FlowParams::from_request_times(
|kind| load_distribution.expected_time(kind),
params.config.load_share,
load_share,
Duration::from_secs(params.config.max_stored_seconds),
);

Expand All @@ -389,6 +438,7 @@ impl LightProtocol {
req_id: AtomicUsize::new(0),
sample_store,
load_distribution,
statistics: RwLock::new(Statistics::new()),
}
}

Expand All @@ -408,6 +458,16 @@ impl LightProtocol {
)
}

/// Get the number of active light peers downloading from the
/// node
pub fn leecher_count(&self) -> usize {
let credit_limit = *self.flow_params.read().limit();
// Count the number of peers that used some credit
self.peers.read().iter()
.filter(|(_, p)| p.lock().local_credits.current() < credit_limit)
.count()
}

/// Make a request to a peer.
///
/// Fails on: nonexistent peer, network error, peer not server,
Expand Down Expand Up @@ -772,12 +832,16 @@ impl LightProtocol {
fn begin_new_cost_period(&self, io: &IoContext) {
self.load_distribution.end_period(&*self.sample_store);

let avg_peer_count = self.statistics.read().avg_peer_count();
// Load share relative to average peer count +LEECHER_COUNT_FACTOR%
let load_share = MAX_LIGHTSERV_LOAD / (avg_peer_count * LEECHER_COUNT_FACTOR);
let new_params = Arc::new(FlowParams::from_request_times(
|kind| self.load_distribution.expected_time(kind),
self.config.load_share,
load_share,
Duration::from_secs(self.config.max_stored_seconds),
));
*self.flow_params.write() = new_params.clone();
trace!(target: "pip", "New cost period: avg_peers={} ; cost_table:{:?}", avg_peer_count, new_params.cost_table());

let peers = self.peers.read();
let now = Instant::now();
Expand All @@ -797,6 +861,11 @@ impl LightProtocol {
peer_info.awaiting_acknowledge = Some((now, new_params.clone()));
}
}

fn tick_statistics(&self) {
let leecher_count = self.leecher_count();
self.statistics.write().add_peer_count(leecher_count);
}
}

impl LightProtocol {
Expand Down Expand Up @@ -1099,6 +1168,8 @@ impl NetworkProtocolHandler for LightProtocol {
.expect("Error registering sync timer.");
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL)
.expect("Error registering request timer interval token.");
io.register_timer(STATISTICS_TIMEOUT, STATISTICS_INTERVAL)
.expect("Error registering statistics timer.");
}

fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
Expand All @@ -1119,6 +1190,7 @@ impl NetworkProtocolHandler for LightProtocol {
TICK_TIMEOUT => self.tick_handlers(&io),
PROPAGATE_TIMEOUT => self.propagate_transactions(&io),
RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(&io),
STATISTICS_TIMEOUT => self.tick_statistics(),
_ => warn!(target: "pip", "received timeout on unknown token {}", timer),
}
}
Expand Down
34 changes: 33 additions & 1 deletion ethcore/light/src/net/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use ethcore::client::{EachBlockWith, TestBlockChainClient};
use ethcore::encoded;
use ethcore::ids::BlockId;
use ethereum_types::{H256, U256, Address};
use net::{LightProtocol, Params, packet, Peer};
use net::{LightProtocol, Params, packet, Peer, Statistics};
use net::context::IoContext;
use net::status::{Capabilities, Status};
use net::load_timer::MOVING_SAMPLE_SIZE;
use network::{PeerId, NodeId};
use provider::Provider;
use request;
Expand Down Expand Up @@ -780,3 +781,34 @@ fn get_transaction_index() {
let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, 1, packet::REQUEST, &request_body);
}

#[test]
fn sync_statistics() {
let mut stats = Statistics::new();

// Empty set should return 1.0
assert_eq!(stats.avg_peer_count(), 1.0);

// Average < 1.0 should return 1.0
stats.add_peer_count(0);
assert_eq!(stats.avg_peer_count(), 1.0);

stats = Statistics::new();

const N: f64 = 50.0;

for i in 1..(N as usize + 1) {
Copy link
Collaborator

@niklasad1 niklasad1 Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Rust have inclusive range, the syntax is: for i in 0..=N

stats.add_peer_count(i);
}

// Compute the average for the sum 1..N
assert_eq!(stats.avg_peer_count(), N * (N + 1.0) / 2.0 / N);

for _ in 1..(MOVING_SAMPLE_SIZE + 1) {
stats.add_peer_count(40);
}

// Test that it returns the average of the last
// `MOVING_SAMPLE_SIZE` values
assert_eq!(stats.avg_peer_count(), 40.0);
}
27 changes: 4 additions & 23 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,10 @@ pub struct EthSync {

fn light_params(
network_id: u64,
max_peers: u32,
median_peers: f64,
pruning_info: PruningInfo,
sample_store: Option<Box<SampleStore>>,
) -> LightParams {
const MAX_LIGHTSERV_LOAD: f64 = 0.5;

let mut light_params = LightParams {
network_id: network_id,
config: Default::default(),
Expand All @@ -282,9 +280,7 @@ fn light_params(
sample_store: sample_store,
};

let max_peers = ::std::cmp::max(max_peers, 1);
light_params.config.load_share = MAX_LIGHTSERV_LOAD / max_peers as f64;

light_params.config.median_peers = median_peers;
light_params
}

Expand All @@ -301,9 +297,10 @@ impl EthSync {
.map(|mut p| { p.push("request_timings"); light_net::FileStore(p) })
.map(|store| Box::new(store) as Box<_>);

let median_peers = (params.network_config.min_peers + params.network_config.max_peers) as f64 / 2.0;
ordian marked this conversation as resolved.
Show resolved Hide resolved
let light_params = light_params(
params.config.network_id,
params.network_config.max_peers,
median_peers,
pruning_info,
sample_store,
);
Expand Down Expand Up @@ -940,19 +937,3 @@ impl LightSyncProvider for LightSync {
Default::default() // TODO
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn light_params_load_share_depends_on_max_peers() {
let pruning_info = PruningInfo {
earliest_chain: 0,
earliest_state: 0,
};
let params1 = light_params(0, 10, pruning_info.clone(), None);
let params2 = light_params(0, 20, pruning_info, None);
assert!(params1.config.load_share > params2.config.load_share)
}
}