Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Propagating transactions to peers on timer. #2035

Merged
merged 1 commit into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use receipt::{Receipt, LocalizedReceipt};
use blockchain::extras::BlockReceipts;
use error::{ImportResult};
use evm::{Factory as EvmFactory, VMType};
use miner::{Miner, MinerService};
use miner::{Miner, MinerService, TransactionImportResult};
use spec::Spec;

use block_queue::BlockQueueInfo;
Expand Down Expand Up @@ -254,6 +254,24 @@ impl TestBlockChainClient {
BlockID::Latest | BlockID::Pending => self.numbers.read().get(&(self.numbers.read().len() - 1)).cloned()
}
}

/// Inserts a transaction to miners transactions queue.
pub fn insert_transaction_to_queue(&self) {
let keypair = Random.generate().unwrap();
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::one(),
nonce: U256::zero()
};
let signed_tx = tx.sign(keypair.secret());
self.set_balance(signed_tx.sender().unwrap(), 10_000_000.into());
let res = self.miner.import_external_transactions(self, vec![signed_tx]);
let res = res.into_iter().next().unwrap().expect("Successful import");
assert_eq!(res, TransactionImportResult::Current);
}
}

pub fn get_temp_journal_db() -> GuardedTempResult<Box<JournalDB>> {
Expand Down
1 change: 1 addition & 0 deletions sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain));
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain));
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain));
}
}

Expand Down
122 changes: 98 additions & 24 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ struct PeerInfo {
asking_hash: Option<H256>,
/// Request timestamp
ask_time: f64,
/// Pending request is expird and result should be ignored
/// Holds a set of transactions recently sent to this peer to avoid spamming.
last_sent_transactions: HashSet<H256>,
/// Pending request is expired and result should be ignored
expired: bool,
/// Peer fork confirmation status
confirmation: ForkConfirmation,
Expand Down Expand Up @@ -406,6 +408,7 @@ impl ChainSync {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
};
Expand Down Expand Up @@ -1447,42 +1450,65 @@ impl ChainSync {
}

/// propagates new transactions to all peers
fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {

// Early out of nobody to send to.
if self.peers.is_empty() {
return 0;
}

let mut transactions = io.chain().pending_transactions();
let transactions = io.chain().pending_transactions();
if transactions.is_empty() {
return 0;
}

let mut packet = RlpStream::new_list(transactions.len());
let tx_count = transactions.len();
for tx in transactions.drain(..) {
packet.append(&tx);
}
let rlp = packet.out();
let all_transactions_hashes = transactions.iter().map(|ref tx| tx.hash()).collect::<HashSet<H256>>();
let all_transactions_rlp = {
let mut packet = RlpStream::new_list(transactions.len());
for tx in &transactions { packet.append(tx); }
packet.out()
};

let lucky_peers = {
// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let lucky_peers = self.peers.iter()
.filter_map(|(&p, _)| if small || ::rand::random::<u32>() < fraction { Some(p.clone()) } else { None })
.collect::<Vec<_>>();
// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION;

let lucky_peers = self.peers.iter_mut()
.filter(|_| small || ::rand::random::<u32>() < fraction)
.take(MAX_PEERS_PROPAGATION)
.filter_map(|(peer_id, mut peer_info)| {
// Send all transactions
if peer_info.last_sent_transactions.is_empty() {
peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((*peer_id, all_transactions_rlp.clone()));
}

// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().cloned().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
};
// Get hashes of all transactions to send to this peer
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::<HashSet<_>>();
if to_send.is_empty() {
return None;
}

// Construct RLP
let mut packet = RlpStream::new_list(to_send.len());
for tx in &transactions {
if to_send.contains(&tx.hash()) {
packet.append(tx);
}
}

peer_info.last_sent_transactions = to_send;
Some((*peer_id, packet.out()))
})
.collect::<Vec<_>>();

// Send RLPs
let sent = lucky_peers.len();
for peer_id in lucky_peers {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
for (peer_id, rlp) in lucky_peers.into_iter() {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
}
trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent);

trace!(target: "sync", "Sent up to {} transactions to {} peers.", transactions.len(), sent);
sent
}

Expand Down Expand Up @@ -1512,16 +1538,18 @@ impl ChainSync {
self.check_resume(io);
}

/// called when block is imported to chain, updates transactions queue and propagates the blocks
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
if io.is_chain_queue_empty() {
// Propagate latests blocks
self.propagate_latest_blocks(io, sealed);
}
if !invalid.is_empty() {
trace!(target: "sync", "Bad blocks in the queue, restarting");
self.restart_on_bad_block(io);
}
for peer_info in self.peers.values_mut() {
peer_info.last_sent_transactions.clear();
}
}
}

Expand Down Expand Up @@ -1723,6 +1751,7 @@ mod tests {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,
});
Expand Down Expand Up @@ -1819,6 +1848,51 @@ mod tests {
assert_eq!(0x07, io.queue[0].packet_id);
}

#[test]
fn propagates_transactions() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);

// 1 message should be send
assert_eq!(1, io.queue.len());
// 1 peer should be updated but only once
assert_eq!(1, peer_count);
assert_eq!(0, peer_count2);
// TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id);
}

#[test]
fn propagates_transactions_again_after_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io);
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);

// 1 message should be send
assert_eq!(2, io.queue.len());
// 1 peer should be updated but only once
assert_eq!(1, peer_count);
assert_eq!(1, peer_count2);
// TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id);
assert_eq!(0x02, io.queue[1].packet_id);
}


#[test]
fn handles_peer_new_block_malformed() {
let mut client = TestBlockChainClient::new();
Expand Down