From 27a6d981b602c4e81a7eac2a2a8ee21dd658f9e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Mon, 4 Feb 2019 16:23:22 +0100 Subject: [PATCH 01/11] * Make the specification of the protocol to which a packet_id belongs to explicit when calling "SyncRequester::send_packet". * Remove "SyncIO::send" and leave only "SyncIO::send_protocol" --- ethcore/sync/src/api.rs | 4 ++-- ethcore/sync/src/chain/mod.rs | 10 +++++----- ethcore/sync/src/chain/propagator.rs | 23 ++++++++++++----------- ethcore/sync/src/chain/requester.rs | 28 ++++++++++++---------------- ethcore/sync/src/sync_io.rs | 6 ------ 5 files changed, 31 insertions(+), 40 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 28565db3968..69087c7225b 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -576,9 +576,9 @@ impl ChainNotify for EthSync { match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::PrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message), ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message), } }); } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 3a8c0a19bcd..ff6ec3a1d4a 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -103,7 +103,7 @@ use fastmap::{H256FastMap, H256FastSet}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; use rlp::{RlpStream, DecoderError}; -use network::{self, PeerId, PacketId}; +use network::{self, PeerId, PacketId, ProtocolId}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo}; use ethcore::snapshot::{RestorationStatus}; use sync_io::SyncIo; @@ -111,7 +111,7 @@ use super::{WarpSync, SyncConfig}; use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; -use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask}; +use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask}; use private_tx::PrivateTxHandler; use transactions_stats::{TransactionsStats, Stats as TransactionStats}; use types::transaction::UnverifiedTransaction; @@ -481,7 +481,7 @@ impl ChainSyncApi { for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) { check_deadline(deadline)?; for peer in peers { - SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer) { peer.latest_hash = hash; } @@ -1328,8 +1328,8 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { - SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { + SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet); } } diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 74ef27e8872..f2915189585 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -17,10 +17,11 @@ use std::cmp; use std::collections::HashSet; +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use bytes::Bytes; use ethereum_types::H256; use fastmap::H256FastSet; -use network::{PeerId, PacketId}; +use network::{PeerId, PacketId,ProtocolId}; use rand::Rng; use rlp::{Encodable, RlpStream}; use sync_io::SyncIo; @@ -49,7 +50,7 @@ fn accepts_service_transaction(client_id: &str) -> bool { const LEGACY_CLIENT_ID_PREFIX: &'static str = "Parity/"; const PARITY_CLIENT_ID_PREFIX: &'static str = "Parity-Ethereum/"; const VERSION_PREFIX: &'static str = "/v"; - + let idx = client_id.rfind(VERSION_PREFIX).map(|idx| idx + VERSION_PREFIX.len()).unwrap_or(client_id.len()); let splitted = if client_id.starts_with(LEGACY_CLIENT_ID_PREFIX) || client_id.starts_with(PARITY_CLIENT_ID_PREFIX) { client_id[idx..].split('.') @@ -74,7 +75,7 @@ impl SyncPropagator { let sent = peers.len(); let mut send_packet = |io: &mut SyncIo, rlp: Bytes| { for peer_id in peers { - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = chain_info.best_block_hash.clone(); } @@ -109,7 +110,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = best_block_hash; } - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); } sent } @@ -177,7 +178,7 @@ impl SyncPropagator { let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { let size = rlp.len(); - SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); + SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp); trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); }; @@ -296,7 +297,7 @@ impl SyncPropagator { io.chain().chain_info().total_difficulty ); for peer_id in &peers { - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); } } } @@ -306,12 +307,12 @@ impl SyncPropagator { let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers()); trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); for peer_id in lucky_peers { - SyncPropagator::send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); + SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); } } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); if lucky_peers.is_empty() { error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected"); @@ -321,7 +322,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_private_transactions.insert(transaction_hash); } - SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); + SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone()); } } } @@ -342,8 +343,8 @@ impl SyncPropagator { } /// Generic packet sender - pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { - if let Err(e) = sync.send(peer_id, packet_id, packet) { + pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); } diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 7307ffc4483..e30f01eee47 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -14,11 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . -use api::WARP_SYNC_PROTOCOL_ID; +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use block_sync::BlockRequest; use bytes::Bytes; use ethereum_types::H256; -use network::{PeerId, PacketId}; +use network::{PeerId, PacketId, ProtocolId}; use rlp::RlpStream; use std::time::Instant; use sync_io::SyncIo; @@ -28,7 +28,6 @@ use super::{ BlockSet, ChainSync, PeerAsking, - ETH_PROTOCOL_VERSION_63, GET_BLOCK_BODIES_PACKET, GET_BLOCK_HEADERS_PACKET, GET_RECEIPTS_PACKET, @@ -62,7 +61,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, ETH_PROTOCOL, GET_BLOCK_BODIES_PACKET, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -76,7 +75,7 @@ impl SyncRequester { rlp.append(&1u32); rlp.append(&0u32); rlp.append(&0u32); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out()); } /// Find some headers or blocks to download for a peer. @@ -94,7 +93,7 @@ impl SyncRequester { pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id); let rlp = RlpStream::new_list(0); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); } /// Request headers from a peer by block hash @@ -105,7 +104,7 @@ impl SyncRequester { rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_hash = Some(h.clone()); peer.block_set = Some(set); @@ -118,7 +117,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GET_RECEIPTS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, ETH_PROTOCOL, GET_RECEIPTS_PACKET, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -129,23 +128,20 @@ impl SyncRequester { trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk); let mut rlp = RlpStream::new_list(1); rlp.append(chunk); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_DATA_PACKET, rlp.out()); } /// Generic request sender - fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { + fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } peer.asking = asking; peer.ask_time = Instant::now(); - // TODO [ToDr] This seems quite fragile. Be careful when protocol is updated. - let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 { - io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet) - } else { - io.send(peer_id, packet_id, packet) - }; + + let result = io.send_protocol(protocol, peer_id, packet_id, packet); + if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); io.disconnect_peer(peer_id); diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index 81dc9cf90c5..a497e13402a 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -32,8 +32,6 @@ pub trait SyncIo { fn disconnect_peer(&mut self, peer_id: PeerId); /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error>; - /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Send a packet to a peer using specified protocol. fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Get the blockchain @@ -98,10 +96,6 @@ impl<'s> SyncIo for NetSyncIo<'s> { self.network.respond(packet_id, data) } - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ - self.network.send(peer_id, packet_id, data) - } - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ self.network.send_protocol(protocol, peer_id, packet_id, data) } From 584ed4f8b29a45b015c988443bdd874c3b4c2351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Mon, 4 Feb 2019 17:10:04 +0100 Subject: [PATCH 02/11] Adapt tests to new code. --- ethcore/sync/src/tests/helpers.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 85c20384579..cb19577613e 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -102,7 +102,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { self.packets.push(TestPacket { data: data, packet_id: packet_id, @@ -111,10 +111,6 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { - self.send(peer_id, packet_id, data) - } - fn chain(&self) -> &BlockChainClient { &*self.chain } @@ -234,9 +230,9 @@ impl EthPeer where C: FlushingBlockChainClient { match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::PrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, data), ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, data), } } From aecfb1f2a3d16e2eb808095f9542f49b67ddd30a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Wed, 6 Feb 2019 10:29:23 +0100 Subject: [PATCH 03/11] Strengthen tests to check if packet_id and protocol match when sending a devp2p packet. --- ethcore/sync/src/chain/mod.rs | 2 +- ethcore/sync/src/tests/helpers.rs | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index ff6ec3a1d4a..1d3ea7df510 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -153,7 +153,7 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024; const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; const SNAPSHOT_MIN_PEERS: usize = 3; -const STATUS_PACKET: u8 = 0x00; +pub const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; const TRANSACTIONS_PACKET: u8 = 0x02; pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index cb19577613e..0dd4cec4538 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -30,8 +30,8 @@ use ethcore::miner::Miner; use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; -use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier}; +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; +use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SyncSupplier, STATUS_PACKET, RECEIPTS_PACKET, GET_SNAPSHOT_MANIFEST_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; @@ -80,6 +80,16 @@ impl<'p, C> Drop for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { } } +fn assert_packet_id_matches_protocol(protocol: &ProtocolId, packet_id: &PacketId) { + match packet_id { + STATUS_PACKET ... RECEIPTS_PACKET => assert_eq!(*protocol, ETH_PROTOCOL), + GET_SNAPSHOT_MANIFEST_PACKET ... SIGNED_PRIVATE_TRANSACTION_PACKET => assert_eq!(*protocol, WARP_SYNC_PROTOCOL_ID), + // What about light? + _ => assert!(false) + } +} + + impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { fn disable_peer(&mut self, peer_id: PeerId) { self.disconnect_peer(peer_id); @@ -102,7 +112,9 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + assert_packet_id_matches_protocol(&protocol, &packet_id); + self.packets.push(TestPacket { data: data, packet_id: packet_id, From 0bef1ce2dca5d0e576538cfa965f551cf500e02b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Thu, 7 Feb 2019 17:41:01 +0100 Subject: [PATCH 04/11] Early proposal for binding packet_id and protocol together in chain sync. --- Cargo.lock | 10 +++ ethcore/sync/Cargo.toml | 1 + ethcore/sync/src/chain/mod.rs | 1 + ethcore/sync/src/chain/packet.rs | 108 +++++++++++++++++++++++++++++++ ethcore/sync/src/lib.rs | 2 + 5 files changed, 122 insertions(+) create mode 100644 ethcore/sync/src/chain/packet.rs diff --git a/Cargo.lock b/Cargo.lock index 6f24f667f09..697f9273429 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,6 +578,14 @@ dependencies = [ "heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)", ] +[[package]] +name = "enum_primitive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "env_logger" version = "0.5.13" @@ -1075,6 +1083,7 @@ name = "ethcore-sync" version = "1.12.0" dependencies = [ "common-types 0.1.0", + "enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.12.0", "ethcore-io 1.12.0", @@ -4470,6 +4479,7 @@ dependencies = [ "checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e" "checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0" "checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb" +"checksum enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180" "checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38" "checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02" "checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "" diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index c8545c00f02..8f05d0e8649 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -10,6 +10,7 @@ authors = ["Parity Technologies "] [dependencies] common-types = { path = "../types" } env_logger = "0.5" +enum_primitive = "*" ethcore = { path = ".." } ethcore-io = { path = "../../util/io" } ethcore-light = { path = "../light" } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 1d3ea7df510..b56ab27c2b7 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -88,6 +88,7 @@ //! All other messages are ignored. mod handler; +mod packet; mod propagator; mod requester; mod supplier; diff --git a/ethcore/sync/src/chain/packet.rs b/ethcore/sync/src/chain/packet.rs new file mode 100644 index 00000000000..cb170f75a3f --- /dev/null +++ b/ethcore/sync/src/chain/packet.rs @@ -0,0 +1,108 @@ +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; +use network::{PacketId, ProtocolId}; + +enum_from_primitive! { +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum PacketIds { + StatusPacket = 0x00, + NewBlockHashesPacket = 0x01, + TransactionsPacket = 0x02, + GetBlockHeadersPacket = 0x03, + BlockHeadersPacket = 0x04, + GetBlockBodiesPacket = 0x05, + BlockBodiesPacket = 0x06, + NewBlockPacket = 0x07, + + GetNodeDataPacket = 0x0d, + NodeDataPacket = 0x0e, + GetReceiptsPacket = 0x0f, + ReceiptsPacket = 0x10, + + GetSnapshotManifestPacket = 0x11, + SnapshotManifestPacket = 0x12, + GetSnapshotDataPacket = 0x13, + SnapshotDataPacket = 0x14, + ConsensusDataPacket = 0x15, + PrivateTransactionPacket = 0x16, + SignedPrivateTransactionPacket = 0x17, +} +} + + +use self::PacketIds::*; + +pub trait Packet { + fn id(&self) -> PacketId; + fn protocol(&self) -> ProtocolId; +} + +impl Packet for PacketIds { + fn protocol(&self) -> ProtocolId { + match self { + StatusPacket | + NewBlockHashesPacket | + TransactionsPacket | + GetBlockHeadersPacket | + BlockHeadersPacket | + GetBlockBodiesPacket | + BlockBodiesPacket | + NewBlockPacket | + + GetNodeDataPacket| + NodeDataPacket | + GetReceiptsPacket | + ReceiptsPacket + + => ETH_PROTOCOL, + + GetSnapshotManifestPacket| + SnapshotManifestPacket | + GetSnapshotDataPacket | + SnapshotDataPacket | + ConsensusDataPacket | + PrivateTransactionPacket | + SignedPrivateTransactionPacket + + => WARP_SYNC_PROTOCOL_ID, + } + } + + fn id(&self) -> PacketId { + (*self) as PacketId + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + use enum_primitive::FromPrimitive; + + #[test] + fn packet_ids_from_u8_when_from_primitive_zero_then_equals_status_packet() { + assert_eq!(PacketIds::from_u8(0x00), Some(StatusPacket)); + } + + #[test] + fn packet_ids_from_u8_when_from_primitive_eleven_then_equals_get_snapshot_manifest_packet() { + assert_eq!(PacketIds::from_u8(0x11), Some(GetSnapshotManifestPacket)); + } + + #[test] + fn packet_ids_from_u8_when_invalid_packet_id_then_none() { + assert!(PacketIds::from_u8(0x99).is_none()); + } + + #[test] + fn when_status_packet_then_id_and_protocol_match() { + assert_eq!(StatusPacket.id(), StatusPacket as PacketId); + assert_eq!(StatusPacket.protocol(), ETH_PROTOCOL); + } + + #[test] + fn when_consensus_data_packet_then_id_and_protocol_match() { + assert_eq!(ConsensusDataPacket.id(), ConsensusDataPacket as PacketId); + assert_eq!(ConsensusDataPacket.protocol(), WARP_SYNC_PROTOCOL_ID); + } +} diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index 72fc11cd7a8..dd740f7357f 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -44,6 +44,8 @@ extern crate ethcore_light as light; #[cfg(test)] extern crate kvdb_memorydb; #[cfg(test)] extern crate rustc_hex; +#[macro_use] +extern crate enum_primitive; #[macro_use] extern crate macros; #[macro_use] From 5d8bc34d331c9aebeecc3edd9f2081c2ba119da0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Thu, 7 Feb 2019 19:31:55 +0100 Subject: [PATCH 05/11] Replace PacketId with SyncPacketId in chain sync. --- ethcore/sync/Cargo.toml | 2 +- ethcore/sync/src/api.rs | 8 +- ethcore/sync/src/chain/handler.rs | 74 ++++++------ ethcore/sync/src/chain/mod.rs | 37 ++---- ethcore/sync/src/chain/packet.rs | 12 +- ethcore/sync/src/chain/propagator.rs | 26 ++--- ethcore/sync/src/chain/requester.rs | 25 ++-- ethcore/sync/src/chain/supplier.rs | 167 +++++++++++++-------------- ethcore/sync/src/tests/helpers.rs | 23 +--- 9 files changed, 164 insertions(+), 210 deletions(-) diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index 8f05d0e8649..e231a6d19dc 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -10,7 +10,7 @@ authors = ["Parity Technologies "] [dependencies] common-types = { path = "../types" } env_logger = "0.5" -enum_primitive = "*" +enum_primitive = "0.1.1" ethcore = { path = ".." } ethcore-io = { path = "../../util/io" } ethcore-light = { path = "../light" } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 69087c7225b..5b6b878f952 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -38,8 +38,8 @@ use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, - PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; +use chain::packet::SyncPacketId::*; use light::client::AsLightClient; use light::Provider; use light::net::{ @@ -576,9 +576,9 @@ impl ChainNotify for EthSync { match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::PrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message), ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message), } }); } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 5630ea25636..2748ccdfe9e 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -17,6 +17,7 @@ use api::WARP_SYNC_PROTOCOL_ID; use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction}; use bytes::Bytes; +use enum_primitive::FromPrimitive; use ethcore::error::{Error as EthcoreError, ErrorKind as EthcoreErrorKind, ImportErrorKind, BlockError}; use ethcore::snapshot::{ManifestData, RestorationStatus}; use ethcore::verification::queue::kind::blocks::Unverified; @@ -32,6 +33,7 @@ use types::BlockNumber; use types::block_status::BlockStatus; use types::ids::BlockId; +use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ BlockSet, ChainSync, @@ -47,16 +49,6 @@ use super::{ MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, - BLOCK_BODIES_PACKET, - BLOCK_HEADERS_PACKET, - NEW_BLOCK_HASHES_PACKET, - NEW_BLOCK_PACKET, - PRIVATE_TRANSACTION_PACKET, - RECEIPTS_PACKET, - SIGNED_PRIVATE_TRANSACTION_PACKET, - SNAPSHOT_DATA_PACKET, - SNAPSHOT_MANIFEST_PACKET, - STATUS_PACKET, }; /// The Chain Sync Handler: handles responses from peers @@ -66,36 +58,40 @@ impl SyncHandler { /// Handle incoming packet from peer pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); - let result = match packet_id { - STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp), - BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), - BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), - RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), - NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), - NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), - SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), - SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), - PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp), - SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), - _ => { - debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id); - Ok(()) - } - }; + if let Some(packet_id) = SyncPacketId::from_u8(packet_id) { + let result = match packet_id { + StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp), + BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), + BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), + ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), + NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), + NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), + SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), + SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), + PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp), + SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), + _ => { + debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id()); + Ok(()) + } + }; - match result { - Err(DownloaderImportError::Invalid) => { - debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id); - io.disable_peer(peer); - sync.deactivate_peer(io, peer); - }, - Err(DownloaderImportError::Useless) => { - sync.deactivate_peer(io, peer); - }, - Ok(()) => { - // give a task to the same peer first - sync.sync_peer(io, peer, false); - }, + match result { + Err(DownloaderImportError::Invalid) => { + debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id()); + io.disable_peer(peer); + sync.deactivate_peer(io, peer); + }, + Err(DownloaderImportError::Useless) => { + sync.deactivate_peer(io, peer); + }, + Ok(()) => { + // give a task to the same peer first + sync.sync_peer(io, peer, false); + }, + } + } else { + debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id); } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index b56ab27c2b7..7c7810001c9 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -88,7 +88,7 @@ //! All other messages are ignored. mod handler; -mod packet; +pub mod packet; mod propagator; mod requester; mod supplier; @@ -104,7 +104,7 @@ use fastmap::{H256FastMap, H256FastSet}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; use rlp::{RlpStream, DecoderError}; -use network::{self, PeerId, PacketId, ProtocolId}; +use network::{self, PeerId, PacketId}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo}; use ethcore::snapshot::{RestorationStatus}; use sync_io::SyncIo; @@ -112,13 +112,14 @@ use super::{WarpSync, SyncConfig}; use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; -use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask}; +use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask}; use private_tx::PrivateTxHandler; use transactions_stats::{TransactionsStats, Stats as TransactionStats}; use types::transaction::UnverifiedTransaction; use types::BlockNumber; use self::handler::SyncHandler; +use self::packet::{Packet, SyncPacketId, SyncPacketId::*}; use self::propagator::SyncPropagator; use self::requester::SyncRequester; pub(crate) use self::supplier::SyncSupplier; @@ -154,28 +155,6 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024; const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; const SNAPSHOT_MIN_PEERS: usize = 3; -pub const STATUS_PACKET: u8 = 0x00; -const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; -const TRANSACTIONS_PACKET: u8 = 0x02; -pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; -pub const BLOCK_HEADERS_PACKET: u8 = 0x04; -pub const GET_BLOCK_BODIES_PACKET: u8 = 0x05; -const BLOCK_BODIES_PACKET: u8 = 0x06; -const NEW_BLOCK_PACKET: u8 = 0x07; - -pub const GET_NODE_DATA_PACKET: u8 = 0x0d; -pub const NODE_DATA_PACKET: u8 = 0x0e; -pub const GET_RECEIPTS_PACKET: u8 = 0x0f; -pub const RECEIPTS_PACKET: u8 = 0x10; - -pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11; -pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12; -pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13; -pub const SNAPSHOT_DATA_PACKET: u8 = 0x14; -pub const CONSENSUS_DATA_PACKET: u8 = 0x15; -pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; -pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; - const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5); @@ -482,7 +461,7 @@ impl ChainSyncApi { for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) { check_deadline(deadline)?; for peer in peers { - SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer, NewBlockPacket, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer) { peer.latest_hash = hash; } @@ -1144,7 +1123,7 @@ impl ChainSync { } } packet.complete_unbounded_list(); - io.respond(STATUS_PACKET, packet.out()) + io.respond(StatusPacket.id(), packet.out()) } pub fn maintain_peers(&mut self, io: &mut SyncIo) { @@ -1329,8 +1308,8 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { - SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet); + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacketId, packet: Bytes) { + SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); } } diff --git a/ethcore/sync/src/chain/packet.rs b/ethcore/sync/src/chain/packet.rs index cb170f75a3f..39f95a3959a 100644 --- a/ethcore/sync/src/chain/packet.rs +++ b/ethcore/sync/src/chain/packet.rs @@ -3,7 +3,7 @@ use network::{PacketId, ProtocolId}; enum_from_primitive! { #[derive(Clone, Copy, Debug, PartialEq)] -pub enum PacketIds { +pub enum SyncPacketId { StatusPacket = 0x00, NewBlockHashesPacket = 0x01, TransactionsPacket = 0x02, @@ -29,14 +29,14 @@ pub enum PacketIds { } -use self::PacketIds::*; +use self::SyncPacketId::*; pub trait Packet { fn id(&self) -> PacketId; fn protocol(&self) -> ProtocolId; } -impl Packet for PacketIds { +impl Packet for SyncPacketId { fn protocol(&self) -> ProtocolId { match self { StatusPacket | @@ -81,17 +81,17 @@ mod tests { #[test] fn packet_ids_from_u8_when_from_primitive_zero_then_equals_status_packet() { - assert_eq!(PacketIds::from_u8(0x00), Some(StatusPacket)); + assert_eq!(SyncPacketId::from_u8(0x00), Some(StatusPacket)); } #[test] fn packet_ids_from_u8_when_from_primitive_eleven_then_equals_get_snapshot_manifest_packet() { - assert_eq!(PacketIds::from_u8(0x11), Some(GetSnapshotManifestPacket)); + assert_eq!(SyncPacketId::from_u8(0x11), Some(GetSnapshotManifestPacket)); } #[test] fn packet_ids_from_u8_when_invalid_packet_id_then_none() { - assert!(PacketIds::from_u8(0x99).is_none()); + assert!(SyncPacketId::from_u8(0x99).is_none()); } #[test] diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index f2915189585..b0e36b0a5cc 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -17,11 +17,10 @@ use std::cmp; use std::collections::HashSet; -use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use bytes::Bytes; use ethereum_types::H256; use fastmap::H256FastSet; -use network::{PeerId, PacketId,ProtocolId}; +use network::{PeerId}; use rand::Rng; use rlp::{Encodable, RlpStream}; use sync_io::SyncIo; @@ -29,6 +28,7 @@ use types::transaction::SignedTransaction; use types::BlockNumber; use types::blockchain_info::BlockChainInfo; +use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ random, ChainSync, @@ -36,10 +36,6 @@ use super::{ MAX_PEER_LAG_PROPAGATION, MAX_PEERS_PROPAGATION, MIN_PEERS_PROPAGATION, - CONSENSUS_DATA_PACKET, - NEW_BLOCK_HASHES_PACKET, - NEW_BLOCK_PACKET, - TRANSACTIONS_PACKET, }; /// Checks if peer is able to process service transactions @@ -75,7 +71,7 @@ impl SyncPropagator { let sent = peers.len(); let mut send_packet = |io: &mut SyncIo, rlp: Bytes| { for peer_id in peers { - SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = chain_info.best_block_hash.clone(); } @@ -110,7 +106,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = best_block_hash; } - SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone()); } sent } @@ -178,7 +174,7 @@ impl SyncPropagator { let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { let size = rlp.len(); - SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp); + SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp); trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); }; @@ -297,7 +293,7 @@ impl SyncPropagator { io.chain().chain_info().total_difficulty ); for peer_id in &peers { - SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone()); } } } @@ -307,12 +303,12 @@ impl SyncPropagator { let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers()); trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); for peer_id in lucky_peers { - SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); + SyncPropagator::send_packet(io, peer_id, ConsensusDataPacket, packet.clone()); } } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacketId, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); if lucky_peers.is_empty() { error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected"); @@ -322,7 +318,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_private_transactions.insert(transaction_hash); } - SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone()); + SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); } } } @@ -343,8 +339,8 @@ impl SyncPropagator { } /// Generic packet sender - pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { - if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) { + pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacketId, packet: Bytes) { + if let Err(e) = sync.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); } diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index e30f01eee47..5e80b71a249 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -14,25 +14,20 @@ // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . -use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use block_sync::BlockRequest; use bytes::Bytes; use ethereum_types::H256; -use network::{PeerId, PacketId, ProtocolId}; +use network::{PeerId}; use rlp::RlpStream; use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; +use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ BlockSet, ChainSync, PeerAsking, - GET_BLOCK_BODIES_PACKET, - GET_BLOCK_HEADERS_PACKET, - GET_RECEIPTS_PACKET, - GET_SNAPSHOT_DATA_PACKET, - GET_SNAPSHOT_MANIFEST_PACKET, }; /// The Chain Sync Requester: requesting data to other peers @@ -61,7 +56,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, ETH_PROTOCOL, GET_BLOCK_BODIES_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GetBlockBodiesPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -75,7 +70,7 @@ impl SyncRequester { rlp.append(&1u32); rlp.append(&0u32); rlp.append(&0u32); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GetBlockHeadersPacket, rlp.out()); } /// Find some headers or blocks to download for a peer. @@ -93,7 +88,7 @@ impl SyncRequester { pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id); let rlp = RlpStream::new_list(0); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out()); } /// Request headers from a peer by block hash @@ -104,7 +99,7 @@ impl SyncRequester { rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GetBlockHeadersPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_hash = Some(h.clone()); peer.block_set = Some(set); @@ -117,7 +112,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, ETH_PROTOCOL, GET_RECEIPTS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GetReceiptsPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -128,11 +123,11 @@ impl SyncRequester { trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk); let mut rlp = RlpStream::new_list(1); rlp.append(chunk); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_DATA_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GetSnapshotDataPacket, rlp.out()); } /// Generic request sender - fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { + fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacketId, packet: Bytes) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); @@ -140,7 +135,7 @@ impl SyncRequester { peer.asking = asking; peer.ask_time = Instant::now(); - let result = io.send_protocol(protocol, peer_id, packet_id, packet); + let result = io.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), packet); if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 8656abcffd4..20ea3ba3bfa 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -15,6 +15,7 @@ // along with Parity Ethereum. If not, see . use bytes::Bytes; +use enum_primitive::FromPrimitive; use ethereum_types::H256; use network::{self, PeerId}; use parking_lot::RwLock; @@ -25,30 +26,17 @@ use types::ids::BlockId; use sync_io::SyncIo; +use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; + use super::{ ChainSync, SyncHandler, RlpResponseResult, PacketDecodeError, - BLOCK_BODIES_PACKET, - BLOCK_HEADERS_PACKET, - CONSENSUS_DATA_PACKET, - GET_BLOCK_BODIES_PACKET, - GET_BLOCK_HEADERS_PACKET, - GET_NODE_DATA_PACKET, - GET_RECEIPTS_PACKET, - GET_SNAPSHOT_DATA_PACKET, - GET_SNAPSHOT_MANIFEST_PACKET, MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_NODE_DATA_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, - NODE_DATA_PACKET, - RECEIPTS_PACKET, - SNAPSHOT_DATA_PACKET, - SNAPSHOT_MANIFEST_PACKET, - STATUS_PACKET, - TRANSACTIONS_PACKET, }; /// The Chain Sync Supplier: answers requests from peers with available data @@ -56,72 +44,83 @@ pub struct SyncSupplier; impl SyncSupplier { /// Dispatch incoming requests and responses + // Take a u8 and not a SyncPacketId because this is the entry point + // to chain sync from the outside world. pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); - let result = match packet_id { - GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_block_bodies, - |e| format!("Error sending block bodies: {:?}", e)), - - GET_BLOCK_HEADERS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_block_headers, - |e| format!("Error sending block headers: {:?}", e)), - - GET_RECEIPTS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_receipts, - |e| format!("Error sending receipts: {:?}", e)), - - GET_NODE_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_node_data, - |e| format!("Error sending nodes: {:?}", e)), - - GET_SNAPSHOT_MANIFEST_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_snapshot_manifest, - |e| format!("Error sending snapshot manifest: {:?}", e)), - - GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_snapshot_data, - |e| format!("Error sending snapshot data: {:?}", e)), - - STATUS_PACKET => { - sync.write().on_packet(io, peer, packet_id, data); - Ok(()) - }, - // Packets that require the peer to be confirmed - _ => { - if !sync.read().peers.contains_key(&peer) { - debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); - return; - } - debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); - - match packet_id { - CONSENSUS_DATA_PACKET => { - SyncHandler::on_consensus_packet(io, peer, &rlp) - }, - TRANSACTIONS_PACKET => { - let res = { - let sync_ro = sync.read(); - SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) - }; - if res.is_err() { - // peer sent invalid data, disconnect. - io.disable_peer(peer); - sync.write().deactivate_peer(io, peer); + if let Some(id) = SyncPacketId::from_u8(packet_id) { + let result = match id { + GetBlockBodiesPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_block_bodies, + |e| format!("Error sending block bodies: {:?}", e)), + + GetBlockHeadersPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_block_headers, + |e| format!("Error sending block headers: {:?}", e)), + + GetReceiptsPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_receipts, + |e| format!("Error sending receipts: {:?}", e)), + + GetNodeDataPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_node_data, + |e| format!("Error sending nodes: {:?}", e)), + + GetSnapshotManifestPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_snapshot_manifest, + |e| format!("Error sending snapshot manifest: {:?}", e)), + + GetSnapshotDataPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_snapshot_data, + |e| format!("Error sending snapshot data: {:?}", e)), + + StatusPacket => { + sync.write().on_packet(io, peer, packet_id, data); + Ok(()) + }, + // Packets that require the peer to be confirmed + _ => { + if !sync.read().peers.contains_key(&peer) { + debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); + return; + } + debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); + + match id { + ConsensusDataPacket => { + SyncHandler::on_consensus_packet(io, peer, &rlp) + }, + TransactionsPacket => { + let res = { + let sync_ro = sync.read(); + SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) + }; + if res.is_err() { + // peer sent invalid data, disconnect. + io.disable_peer(peer); + sync.write().deactivate_peer(io, peer); + } + }, + _ => { + sync.write().on_packet(io, peer, packet_id, data); } - }, - _ => { - sync.write().on_packet(io, peer, packet_id, data); } + + Ok(()) } + }; - Ok(()) - } - }; - result.unwrap_or_else(|e| { - debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); - }) + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } } /// Respond to GetBlockHeaders request @@ -148,11 +147,11 @@ impl SyncSupplier { trace!(target:"sync", "Returning single header: {:?}", hash); let mut rlp = RlpStream::new_list(1); rlp.append_raw(&hdr.into_inner(), 1); - return Ok(Some((BLOCK_HEADERS_PACKET, rlp))); + return Ok(Some((BlockHeadersPacket.id(), rlp))); } number } - None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing + None => return Ok(Some((BlockHeadersPacket.id(), RlpStream::new_list(0)))) //no such header, return nothing } } else { let number = r.val_at::(0)?; @@ -202,7 +201,7 @@ impl SyncSupplier { let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count); - Ok(Some((BLOCK_HEADERS_PACKET, rlp))) + Ok(Some((BlockHeadersPacket.id(), rlp))) } /// Respond to GetBlockBodies request @@ -229,7 +228,7 @@ impl SyncSupplier { let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added); - Ok(Some((BLOCK_BODIES_PACKET, rlp))) + Ok(Some((BlockBodiesPacket.id(), rlp))) } /// Respond to GetNodeData request @@ -261,7 +260,7 @@ impl SyncSupplier { for d in data { rlp.append(&d); } - Ok(Some((NODE_DATA_PACKET, rlp))) + Ok(Some((NodeDataPacket.id(), rlp))) } fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { @@ -287,7 +286,7 @@ impl SyncSupplier { } let mut rlp_result = RlpStream::new_list(added_headers); rlp_result.append_raw(&data, added_headers); - Ok(Some((RECEIPTS_PACKET, rlp_result))) + Ok(Some((ReceiptsPacket.id(), rlp_result))) } /// Respond to GetSnapshotManifest request @@ -310,7 +309,7 @@ impl SyncSupplier { RlpStream::new_list(0) } }; - Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp))) + Ok(Some((SnapshotManifestPacket.id(), rlp))) } /// Respond to GetSnapshotData request @@ -329,7 +328,7 @@ impl SyncSupplier { RlpStream::new_list(0) } }; - Ok(Some((SNAPSHOT_DATA_PACKET, rlp))) + Ok(Some((SnapshotDataPacket.id(), rlp))) } fn return_rlp(io: &mut SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> @@ -491,7 +490,7 @@ mod test { io.sender = Some(2usize); - SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GetNodeDataPacket.id(), &node_request); assert_eq!(1, io.packets.len()); } @@ -533,7 +532,7 @@ mod test { assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GetReceiptsPacket.id(), &receipts_request); assert_eq!(1, io.packets.len()); } } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 0dd4cec4538..6b62b904411 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -30,8 +30,9 @@ use ethcore::miner::Miner; use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; -use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SyncSupplier, STATUS_PACKET, RECEIPTS_PACKET, GET_SNAPSHOT_MANIFEST_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; +use api::WARP_SYNC_PROTOCOL_ID; +use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; +use chain::packet::SyncPacketId::*; use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; @@ -80,16 +81,6 @@ impl<'p, C> Drop for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { } } -fn assert_packet_id_matches_protocol(protocol: &ProtocolId, packet_id: &PacketId) { - match packet_id { - STATUS_PACKET ... RECEIPTS_PACKET => assert_eq!(*protocol, ETH_PROTOCOL), - GET_SNAPSHOT_MANIFEST_PACKET ... SIGNED_PRIVATE_TRANSACTION_PACKET => assert_eq!(*protocol, WARP_SYNC_PROTOCOL_ID), - // What about light? - _ => assert!(false) - } -} - - impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { fn disable_peer(&mut self, peer_id: PeerId) { self.disconnect_peer(peer_id); @@ -112,9 +103,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { - assert_packet_id_matches_protocol(&protocol, &packet_id); - + fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { self.packets.push(TestPacket { data: data, packet_id: packet_id, @@ -242,9 +231,9 @@ impl EthPeer where C: FlushingBlockChainClient { match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::PrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PrivateTransactionPacket, data), ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SignedPrivateTransactionPacket, data), } } From afd14314bc7736bda6f924d17c72781541946568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Fri, 8 Feb 2019 09:36:34 +0100 Subject: [PATCH 06/11] Rename module. --- ethcore/sync/src/api.rs | 2 +- ethcore/sync/src/chain/handler.rs | 2 +- ethcore/sync/src/chain/mod.rs | 4 ++-- ethcore/sync/src/chain/propagator.rs | 2 +- ethcore/sync/src/chain/requester.rs | 2 +- ethcore/sync/src/chain/supplier.rs | 2 +- ethcore/sync/src/chain/{packet.rs => syncpacketid.rs} | 0 7 files changed, 7 insertions(+), 7 deletions(-) rename ethcore/sync/src/chain/{packet.rs => syncpacketid.rs} (100%) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 5b6b878f952..eefb8dd50b1 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -39,7 +39,7 @@ use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; -use chain::packet::SyncPacketId::*; +use chain::syncpacketid::SyncPacketId::*; use light::client::AsLightClient; use light::Provider; use light::net::{ diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 2748ccdfe9e..3a23754f3cf 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -33,7 +33,7 @@ use types::BlockNumber; use types::block_status::BlockStatus; use types::ids::BlockId; -use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ BlockSet, ChainSync, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 7c7810001c9..bd8bf443486 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -88,7 +88,7 @@ //! All other messages are ignored. mod handler; -pub mod packet; +pub mod syncpacketid; mod propagator; mod requester; mod supplier; @@ -119,7 +119,7 @@ use types::transaction::UnverifiedTransaction; use types::BlockNumber; use self::handler::SyncHandler; -use self::packet::{Packet, SyncPacketId, SyncPacketId::*}; +use self::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; use self::propagator::SyncPropagator; use self::requester::SyncRequester; pub(crate) use self::supplier::SyncSupplier; diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index b0e36b0a5cc..f473cc04d0c 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -28,7 +28,7 @@ use types::transaction::SignedTransaction; use types::BlockNumber; use types::blockchain_info::BlockChainInfo; -use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ random, ChainSync, diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 5e80b71a249..4bae28a8242 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -23,7 +23,7 @@ use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; -use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ BlockSet, ChainSync, diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 20ea3ba3bfa..f9549435541 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -26,7 +26,7 @@ use types::ids::BlockId; use sync_io::SyncIo; -use super::packet::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; use super::{ ChainSync, diff --git a/ethcore/sync/src/chain/packet.rs b/ethcore/sync/src/chain/syncpacketid.rs similarity index 100% rename from ethcore/sync/src/chain/packet.rs rename to ethcore/sync/src/chain/syncpacketid.rs From 1d9ae78b1275333377806f3bf3ed126999658e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Fri, 8 Feb 2019 09:49:20 +0100 Subject: [PATCH 07/11] Remove send_protocol from SyncIo and leave just send. --- ethcore/sync/src/chain/propagator.rs | 4 ++-- ethcore/sync/src/chain/requester.rs | 4 ++-- ethcore/sync/src/sync_io.rs | 7 ++++--- ethcore/sync/src/tests/helpers.rs | 6 +++--- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index f473cc04d0c..6a0f33010f7 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -28,7 +28,7 @@ use types::transaction::SignedTransaction; use types::BlockNumber; use types::blockchain_info::BlockChainInfo; -use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{SyncPacketId, SyncPacketId::*}; use super::{ random, ChainSync, @@ -340,7 +340,7 @@ impl SyncPropagator { /// Generic packet sender pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacketId, packet: Bytes) { - if let Err(e) = sync.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), packet) { + if let Err(e) = sync.send(peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); } diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 4bae28a8242..79d5e764b46 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -23,7 +23,7 @@ use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; -use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{SyncPacketId, SyncPacketId::*}; use super::{ BlockSet, ChainSync, @@ -135,7 +135,7 @@ impl SyncRequester { peer.asking = asking; peer.ask_time = Instant::now(); - let result = io.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), packet); + let result = io.send(peer_id, packet_id, packet); if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index a497e13402a..dbc2c299efe 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -15,6 +15,7 @@ // along with Parity Ethereum. If not, see . use std::collections::HashMap; +use chain::syncpacketid::{Packet, SyncPacketId}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId}; use bytes::Bytes; use ethcore::client::BlockChainClient; @@ -33,7 +34,7 @@ pub trait SyncIo { /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Send a packet to a peer using specified protocol. - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; + fn send(&mut self, peer_id: PeerId, packet_id: SyncPacketId, data: Vec) -> Result<(), Error>; /// Get the blockchain fn chain(&self) -> &BlockChainClient; /// Get the snapshot service. @@ -96,8 +97,8 @@ impl<'s> SyncIo for NetSyncIo<'s> { self.network.respond(packet_id, data) } - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ - self.network.send_protocol(protocol, peer_id, packet_id, data) + fn send(&mut self, peer_id: PeerId, packet_id: SyncPacketId, data: Vec) -> Result<(), Error>{ + self.network.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), data) } fn chain(&self) -> &BlockChainClient { diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 6b62b904411..1aa4d7597b8 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -32,7 +32,7 @@ use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; -use chain::packet::SyncPacketId::*; +use chain::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; @@ -103,10 +103,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + fn send(&mut self,peer_id: PeerId, packet_id: SyncPacketId, data: Vec) -> Result<(), network::Error> { self.packets.push(TestPacket { data: data, - packet_id: packet_id, + packet_id: packet_id.id(), recipient: peer_id, }); Ok(()) From 16afb114d8ceffd60f46480263e35890eabd11a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Fri, 8 Feb 2019 09:57:08 +0100 Subject: [PATCH 08/11] Rename trait Packet to PacketInfo. --- ethcore/sync/src/chain/handler.rs | 2 +- ethcore/sync/src/chain/mod.rs | 2 +- ethcore/sync/src/chain/supplier.rs | 2 +- ethcore/sync/src/chain/syncpacketid.rs | 4 ++-- ethcore/sync/src/sync_io.rs | 2 +- ethcore/sync/src/tests/helpers.rs | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 3a23754f3cf..4e1a2cf89bd 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -33,7 +33,7 @@ use types::BlockNumber; use types::block_status::BlockStatus; use types::ids::BlockId; -use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; use super::{ BlockSet, ChainSync, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index bd8bf443486..428663c1118 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -119,7 +119,7 @@ use types::transaction::UnverifiedTransaction; use types::BlockNumber; use self::handler::SyncHandler; -use self::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; +use self::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; use self::propagator::SyncPropagator; use self::requester::SyncRequester; pub(crate) use self::supplier::SyncSupplier; diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index f9549435541..421d6169a47 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -26,7 +26,7 @@ use types::ids::BlockId; use sync_io::SyncIo; -use super::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; use super::{ ChainSync, diff --git a/ethcore/sync/src/chain/syncpacketid.rs b/ethcore/sync/src/chain/syncpacketid.rs index 39f95a3959a..fc522e789ef 100644 --- a/ethcore/sync/src/chain/syncpacketid.rs +++ b/ethcore/sync/src/chain/syncpacketid.rs @@ -31,12 +31,12 @@ pub enum SyncPacketId { use self::SyncPacketId::*; -pub trait Packet { +pub trait PacketInfo { fn id(&self) -> PacketId; fn protocol(&self) -> ProtocolId; } -impl Packet for SyncPacketId { +impl PacketInfo for SyncPacketId { fn protocol(&self) -> ProtocolId { match self { StatusPacket | diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index dbc2c299efe..a896b3266b4 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -15,7 +15,7 @@ // along with Parity Ethereum. If not, see . use std::collections::HashMap; -use chain::syncpacketid::{Packet, SyncPacketId}; +use chain::syncpacketid::{PacketInfo, SyncPacketId}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId}; use bytes::Bytes; use ethcore::client::BlockChainClient; diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 1aa4d7597b8..48bc34a95b3 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -32,7 +32,7 @@ use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; -use chain::syncpacketid::{Packet, SyncPacketId, SyncPacketId::*}; +use chain::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; From 81022bfd08ab342318fcec123cb1471c2b2779f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Fri, 8 Feb 2019 10:09:34 +0100 Subject: [PATCH 09/11] Document. --- ethcore/sync/src/chain/syncpacketid.rs | 35 +++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/ethcore/sync/src/chain/syncpacketid.rs b/ethcore/sync/src/chain/syncpacketid.rs index fc522e789ef..f795e6b7f52 100644 --- a/ethcore/sync/src/chain/syncpacketid.rs +++ b/ethcore/sync/src/chain/syncpacketid.rs @@ -1,6 +1,36 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity Ethereum. + +// Parity Ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Ethereum. If not, see . + +//! When sending packets over p2p we specify both which subprotocol +//! to use and what kind of packet we are sending (through a packet id). +//! Likewise when receiving packets from other peers we decode the +//! subprotocol and the packet id. This module helps coupling both +//! pieces of information together and provides an easy mechanism +//! to convert to/from the packet id values transmitted over the +//! wire. + use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use network::{PacketId, ProtocolId}; +/// An enum that defines all known packet ids in the context of +/// synchronization and provides a mechanism to convert from +/// packet ids (of type PacketId or u8) directly read from the network +/// to enum variants. This implicitly provides a mechanism to +/// check whether a given packet id is known, and to prevent +/// packet id clashes when defining new ids. enum_from_primitive! { #[derive(Clone, Copy, Debug, PartialEq)] pub enum SyncPacketId { @@ -28,14 +58,17 @@ pub enum SyncPacketId { } } - use self::SyncPacketId::*; +/// Provide both subprotocol and packet id information within the +/// same object. pub trait PacketInfo { fn id(&self) -> PacketId; fn protocol(&self) -> ProtocolId; } +// The mechanism to match packet ids and protocol may be improved +// through some macro magic, but for now this works. impl PacketInfo for SyncPacketId { fn protocol(&self) -> ProtocolId { match self { From 8ea473f259080229464d123956a4eabc65081280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Mon, 11 Feb 2019 12:47:14 +0100 Subject: [PATCH 10/11] Remove glob imports --- ethcore/sync/src/api.rs | 2 +- ethcore/sync/src/chain/handler.rs | 15 ++++++++++++++- ethcore/sync/src/chain/mod.rs | 7 ++++++- ethcore/sync/src/chain/propagator.rs | 9 ++++++++- ethcore/sync/src/chain/requester.rs | 10 +++++++++- ethcore/sync/src/chain/supplier.rs | 19 ++++++++++++++++++- ethcore/sync/src/tests/helpers.rs | 4 +++- 7 files changed, 59 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index b78232e4aa8..2df99495eb2 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -40,7 +40,7 @@ use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; -use chain::syncpacketid::SyncPacketId::*; +use chain::syncpacketid::SyncPacketId::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use light::client::AsLightClient; use light::Provider; use light::net::{ diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index e5d0b1bfdb8..21834f407f0 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -34,7 +34,20 @@ use types::BlockNumber; use types::block_status::BlockStatus; use types::ids::BlockId; -use super::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{PacketInfo, SyncPacketId}; +use super::syncpacketid::SyncPacketId::{ + StatusPacket, + NewBlockHashesPacket, + BlockHeadersPacket, + BlockBodiesPacket, + NewBlockPacket, + ReceiptsPacket, + SnapshotManifestPacket, + SnapshotDataPacket, + PrivateTransactionPacket, + SignedPrivateTransactionPacket, +}; + use super::{ BlockSet, ChainSync, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index caf60f7b468..3a836e813f0 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -120,7 +120,12 @@ use types::transaction::UnverifiedTransaction; use types::BlockNumber; use self::handler::SyncHandler; -use self::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; +use self::syncpacketid::{PacketInfo, SyncPacketId}; +use self::syncpacketid:: SyncPacketId::{ + NewBlockPacket, + StatusPacket, +}; + use self::propagator::SyncPropagator; use self::requester::SyncRequester; pub(crate) use self::supplier::SyncSupplier; diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 9b08262140a..c581a693154 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -29,7 +29,14 @@ use types::transaction::SignedTransaction; use types::BlockNumber; use types::blockchain_info::BlockChainInfo; -use super::syncpacketid::{SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::SyncPacketId; +use super::syncpacketid::SyncPacketId::{ + NewBlockHashesPacket, + TransactionsPacket, + NewBlockPacket, + ConsensusDataPacket, +}; + use super::{ random, ChainSync, diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 79d5e764b46..9ae332bdc77 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -23,7 +23,15 @@ use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; -use super::syncpacketid::{SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::SyncPacketId; +use super::syncpacketid::SyncPacketId::{ + GetBlockHeadersPacket, + GetBlockBodiesPacket, + GetReceiptsPacket, + GetSnapshotManifestPacket, + GetSnapshotDataPacket, +}; + use super::{ BlockSet, ChainSync, diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 908586427f3..4707d48a27c 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -26,7 +26,24 @@ use types::ids::BlockId; use sync_io::SyncIo; -use super::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; +use super::syncpacketid::{PacketInfo, SyncPacketId}; +use super::syncpacketid::SyncPacketId::{ + StatusPacket, + TransactionsPacket, + GetBlockHeadersPacket, + BlockHeadersPacket, + GetBlockBodiesPacket, + BlockBodiesPacket, + GetNodeDataPacket, + NodeDataPacket, + GetReceiptsPacket, + ReceiptsPacket, + GetSnapshotManifestPacket, + SnapshotManifestPacket, + GetSnapshotDataPacket, + SnapshotDataPacket, + ConsensusDataPacket, +}; use super::{ ChainSync, diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 4786da3bb81..11e65d99de1 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -32,7 +32,9 @@ use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; -use chain::syncpacketid::{PacketInfo, SyncPacketId, SyncPacketId::*}; +use chain::syncpacketid::{PacketInfo, SyncPacketId}; +use chain::syncpacketid::SyncPacketId::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; + use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; From 889d17d3ef15345912ed7f840c2cf8b5c7180712 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Herrero=20Carr=C3=B3n?= Date: Mon, 11 Feb 2019 17:28:24 +0100 Subject: [PATCH 11/11] Rename SyncPacketId to SyncPacket and corresponding file --- ethcore/sync/src/api.rs | 2 +- ethcore/sync/src/chain/handler.rs | 6 +++--- ethcore/sync/src/chain/mod.rs | 8 ++++---- ethcore/sync/src/chain/propagator.rs | 8 ++++---- ethcore/sync/src/chain/requester.rs | 6 +++--- ethcore/sync/src/chain/supplier.rs | 6 +++--- .../src/chain/{syncpacketid.rs => sync_packet.rs} | 12 ++++++------ ethcore/sync/src/sync_io.rs | 6 +++--- ethcore/sync/src/tests/helpers.rs | 6 +++--- 9 files changed, 30 insertions(+), 30 deletions(-) rename ethcore/sync/src/chain/{syncpacketid.rs => sync_packet.rs} (93%) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 2df99495eb2..f694d077405 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -40,7 +40,7 @@ use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; -use chain::syncpacketid::SyncPacketId::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; +use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use light::client::AsLightClient; use light::Provider; use light::net::{ diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 21834f407f0..63ab8916139 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -34,8 +34,8 @@ use types::BlockNumber; use types::block_status::BlockStatus; use types::ids::BlockId; -use super::syncpacketid::{PacketInfo, SyncPacketId}; -use super::syncpacketid::SyncPacketId::{ +use super::sync_packet::{PacketInfo, SyncPacket}; +use super::sync_packet::SyncPacket::{ StatusPacket, NewBlockHashesPacket, BlockHeadersPacket, @@ -72,7 +72,7 @@ impl SyncHandler { /// Handle incoming packet from peer pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); - if let Some(packet_id) = SyncPacketId::from_u8(packet_id) { + if let Some(packet_id) = SyncPacket::from_u8(packet_id) { let result = match packet_id { StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp), BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 3a836e813f0..81f1ccffe94 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -88,7 +88,7 @@ //! All other messages are ignored. mod handler; -pub mod syncpacketid; +pub mod sync_packet; mod propagator; mod requester; mod supplier; @@ -120,8 +120,8 @@ use types::transaction::UnverifiedTransaction; use types::BlockNumber; use self::handler::SyncHandler; -use self::syncpacketid::{PacketInfo, SyncPacketId}; -use self::syncpacketid:: SyncPacketId::{ +use self::sync_packet::{PacketInfo, SyncPacket}; +use self::sync_packet::SyncPacket::{ NewBlockPacket, StatusPacket, }; @@ -1316,7 +1316,7 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacketId, packet: Bytes) { + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) { SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); } } diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index c581a693154..c3654553ffd 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -29,8 +29,8 @@ use types::transaction::SignedTransaction; use types::BlockNumber; use types::blockchain_info::BlockChainInfo; -use super::syncpacketid::SyncPacketId; -use super::syncpacketid::SyncPacketId::{ +use super::sync_packet::SyncPacket; +use super::sync_packet::SyncPacket::{ NewBlockHashesPacket, TransactionsPacket, NewBlockPacket, @@ -294,7 +294,7 @@ impl SyncPropagator { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacketId, packet: Bytes) { + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); if lucky_peers.is_empty() { error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected"); @@ -325,7 +325,7 @@ impl SyncPropagator { } /// Generic packet sender - pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacketId, packet: Bytes) { + pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) { if let Err(e) = sync.send(peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 9ae332bdc77..31d3ce59006 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -23,8 +23,8 @@ use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; -use super::syncpacketid::SyncPacketId; -use super::syncpacketid::SyncPacketId::{ +use super::sync_packet::SyncPacket; +use super::sync_packet::SyncPacket::{ GetBlockHeadersPacket, GetBlockBodiesPacket, GetReceiptsPacket, @@ -135,7 +135,7 @@ impl SyncRequester { } /// Generic request sender - fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacketId, packet: Bytes) { + fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacket, packet: Bytes) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 4707d48a27c..7e71e6aeec7 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -26,8 +26,8 @@ use types::ids::BlockId; use sync_io::SyncIo; -use super::syncpacketid::{PacketInfo, SyncPacketId}; -use super::syncpacketid::SyncPacketId::{ +use super::sync_packet::{PacketInfo, SyncPacket}; +use super::sync_packet::SyncPacket::{ StatusPacket, TransactionsPacket, GetBlockHeadersPacket, @@ -66,7 +66,7 @@ impl SyncSupplier { pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); - if let Some(id) = SyncPacketId::from_u8(packet_id) { + if let Some(id) = SyncPacket::from_u8(packet_id) { let result = match id { GetBlockBodiesPacket => SyncSupplier::return_rlp( io, &rlp, peer, diff --git a/ethcore/sync/src/chain/syncpacketid.rs b/ethcore/sync/src/chain/sync_packet.rs similarity index 93% rename from ethcore/sync/src/chain/syncpacketid.rs rename to ethcore/sync/src/chain/sync_packet.rs index f795e6b7f52..3891090f65e 100644 --- a/ethcore/sync/src/chain/syncpacketid.rs +++ b/ethcore/sync/src/chain/sync_packet.rs @@ -33,7 +33,7 @@ use network::{PacketId, ProtocolId}; /// packet id clashes when defining new ids. enum_from_primitive! { #[derive(Clone, Copy, Debug, PartialEq)] -pub enum SyncPacketId { +pub enum SyncPacket { StatusPacket = 0x00, NewBlockHashesPacket = 0x01, TransactionsPacket = 0x02, @@ -58,7 +58,7 @@ pub enum SyncPacketId { } } -use self::SyncPacketId::*; +use self::SyncPacket::*; /// Provide both subprotocol and packet id information within the /// same object. @@ -69,7 +69,7 @@ pub trait PacketInfo { // The mechanism to match packet ids and protocol may be improved // through some macro magic, but for now this works. -impl PacketInfo for SyncPacketId { +impl PacketInfo for SyncPacket { fn protocol(&self) -> ProtocolId { match self { StatusPacket | @@ -114,17 +114,17 @@ mod tests { #[test] fn packet_ids_from_u8_when_from_primitive_zero_then_equals_status_packet() { - assert_eq!(SyncPacketId::from_u8(0x00), Some(StatusPacket)); + assert_eq!(SyncPacket::from_u8(0x00), Some(StatusPacket)); } #[test] fn packet_ids_from_u8_when_from_primitive_eleven_then_equals_get_snapshot_manifest_packet() { - assert_eq!(SyncPacketId::from_u8(0x11), Some(GetSnapshotManifestPacket)); + assert_eq!(SyncPacket::from_u8(0x11), Some(GetSnapshotManifestPacket)); } #[test] fn packet_ids_from_u8_when_invalid_packet_id_then_none() { - assert!(SyncPacketId::from_u8(0x99).is_none()); + assert!(SyncPacket::from_u8(0x99).is_none()); } #[test] diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index 40b8a26b841..56bf98ab2ee 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -15,7 +15,7 @@ // along with Parity Ethereum. If not, see . use std::collections::HashMap; -use chain::syncpacketid::{PacketInfo, SyncPacketId}; +use chain::sync_packet::{PacketInfo, SyncPacket}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId}; use network::client_version::ClientVersion; use bytes::Bytes; @@ -35,7 +35,7 @@ pub trait SyncIo { /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Send a packet to a peer using specified protocol. - fn send(&mut self, peer_id: PeerId, packet_id: SyncPacketId, data: Vec) -> Result<(), Error>; + fn send(&mut self, peer_id: PeerId, packet_id: SyncPacket, data: Vec) -> Result<(), Error>; /// Get the blockchain fn chain(&self) -> &BlockChainClient; /// Get the snapshot service. @@ -98,7 +98,7 @@ impl<'s> SyncIo for NetSyncIo<'s> { self.network.respond(packet_id, data) } - fn send(&mut self, peer_id: PeerId, packet_id: SyncPacketId, data: Vec) -> Result<(), Error>{ + fn send(&mut self, peer_id: PeerId, packet_id: SyncPacket, data: Vec) -> Result<(), Error>{ self.network.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), data) } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 11e65d99de1..8bc4b542e28 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -32,8 +32,8 @@ use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; -use chain::syncpacketid::{PacketInfo, SyncPacketId}; -use chain::syncpacketid::SyncPacketId::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; +use chain::sync_packet::{PacketInfo, SyncPacket}; +use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use SyncConfig; use private_tx::SimplePrivateTxHandler; @@ -105,7 +105,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send(&mut self,peer_id: PeerId, packet_id: SyncPacketId, data: Vec) -> Result<(), network::Error> { + fn send(&mut self,peer_id: PeerId, packet_id: SyncPacket, data: Vec) -> Result<(), network::Error> { self.packets.push(TestPacket { data: data, packet_id: packet_id.id(),