diff --git a/Cargo.lock b/Cargo.lock index 6a8b5b8946bb9..7fe0b0e3cfd9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1917,9 +1917,6 @@ dependencies = [ "polkadot-consensus 0.1.0", "polkadot-primitives 0.1.0", "rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-codec 0.1.0", "substrate-network 0.1.0", @@ -2014,8 +2011,6 @@ name = "polkadot-statement-table" version = "0.1.0" dependencies = [ "polkadot-primitives 0.1.0", - "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-codec 0.1.0", "substrate-primitives 0.1.0", ] @@ -2765,19 +2760,12 @@ dependencies = [ "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)", - "substrate-bft 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-keyring 0.1.0", "substrate-network-libp2p 0.1.0", "substrate-primitives 0.1.0", "substrate-runtime-primitives 0.1.0", - "substrate-runtime-support 0.1.0", - "substrate-serializer 0.1.0", "substrate-test-client 0.1.0", ] diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index a4b1900d22d31..791010b12876a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -222,19 +222,19 @@ pub fn run(args: I, worker: W) -> error::Result<()> where // TODO [rob]: collation node implementation // This isn't a thing. Different parachains will have their own collator executables and // maybe link to libpolkadot to get a light-client. - service::Role::LIGHT + service::Roles::LIGHT } else if matches.is_present("light") { info!("Starting (light)"); config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible; - service::Role::LIGHT + service::Roles::LIGHT } else if matches.is_present("validator") || matches.is_present("dev") { info!("Starting validator"); config.execution_strategy = service::ExecutionStrategy::Both; - service::Role::AUTHORITY + service::Roles::AUTHORITY } else { info!("Starting (heavy)"); config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible; - service::Role::FULL + service::Roles::FULL }; if let Some(s) = matches.value_of("execution") { @@ -303,7 +303,7 @@ pub fn run(args: I, worker: W) -> error::Result<()> where None }; - match role == service::Role::LIGHT { + match role == service::Roles::LIGHT { true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?, false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?, } diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml index 71ab17d2affb9..37d36ea205e50 100644 --- a/polkadot/network/Cargo.toml +++ b/polkadot/network/Cargo.toml @@ -5,9 +5,6 @@ authors = ["Parity Technologies "] description = "Polkadot-specific networking protocol" [dependencies] -serde = "1.0" -serde_derive = "1.0" -serde_json = "1.0" parking_lot = "0.4" polkadot-api = { path = "../api" } polkadot-consensus = { path = "../consensus" } diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 12ddade1de1e8..7070eece88c20 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -18,6 +18,7 @@ use polkadot_primitives::{AccountId, Hash}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; +use codec; use futures::sync::oneshot; @@ -27,12 +28,28 @@ use std::time::{Duration, Instant}; const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5); /// The role of the collator. Whether they're the primary or backup for this parachain. -#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Debug, Clone, Copy)] pub enum Role { /// Primary collators should send collations whenever it's time. - Primary, + Primary = 0, /// Backup collators should not. - Backup, + Backup = 1, +} + +impl codec::Encode for Role { + fn encode_to(&self, dest: &mut T) { + dest.push_byte(*self as u8); + } +} + +impl codec::Decode for Role { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + x if x == Role::Primary as u8 => Some(Role::Primary), + x if x == Role::Backup as u8 => Some(Role::Backup), + _ => None, + } + } } /// A maintenance action for the collator set. diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 5b6570dc9ec4d..6b1b141ecacf9 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -26,6 +26,7 @@ use polkadot_api::{PolkadotApi, LocalPolkadotApi}; use polkadot_consensus::{Network, SharedTable, Collators}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; +use codec::Decode; use futures::prelude::*; use futures::sync::mpsc; @@ -175,7 +176,7 @@ impl MessageProcessTask

{ } } ConsensusMessage::ChainSpecific(msg, _) => { - if let Ok(Message::Statement(parent_hash, statement)) = ::serde_json::from_slice(&msg) { + if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) { if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) { self.table_router.import_statement(statement); } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index b419c9c88d27d..7ccd69ea66b0f 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -20,11 +20,6 @@ //! parachain block and extrinsic data fetching, communication between collators and validators, //! and more. -extern crate serde; -#[macro_use] -extern crate serde_derive; -extern crate serde_json; - extern crate substrate_bft as bft; extern crate substrate_codec as codec; extern crate substrate_network; @@ -47,7 +42,7 @@ mod collator_pool; mod router; pub mod consensus; -use codec::{Decode, Encode}; +use codec::{Decode, Encode, Input, Output}; use futures::sync::oneshot; use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; @@ -188,7 +183,6 @@ impl CurrentConsensus { } /// Polkadot-specific messages. -#[derive(Serialize, Deserialize)] pub enum Message { /// signed statement and localized parent hash. Statement(Hash, SignedStatement), @@ -205,8 +199,58 @@ pub enum Message { Collation(Hash, Collation), } +impl Encode for Message { + fn encode_to(&self, dest: &mut T) { + match *self { + Message::Statement(ref h, ref s) => { + dest.push_byte(0); + dest.push(h); + dest.push(s); + } + Message::SessionKey(ref h, ref k) => { + dest.push_byte(1); + dest.push(h); + dest.push(k); + } + Message::RequestBlockData(ref id, ref d) => { + dest.push_byte(2); + dest.push(id); + dest.push(d); + } + Message::BlockData(ref id, ref d) => { + dest.push_byte(3); + dest.push(id); + dest.push(d); + } + Message::CollatorRole(ref r) => { + dest.push_byte(4); + dest.push(r); + } + Message::Collation(ref h, ref c) => { + dest.push_byte(5); + dest.push(h); + dest.push(c); + } + } + } +} + +impl Decode for Message { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + 0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)), + 1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)), + 2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)), + 3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)), + 4 => Some(Message::CollatorRole(Decode::decode(input)?)), + 5 => Some(Message::Collation(Decode::decode(input)?, Decode::decode(input)?)), + _ => None, + } + } +} + fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) { - let encoded = ::serde_json::to_vec(&message).expect("serialization of messages infallible; qed"); + let encoded = message.encode(); ctx.send_message(to, generic_message::Message::ChainSpecific(encoded)) } @@ -244,9 +288,7 @@ impl PolkadotProtocol { /// Send a statement to a validator. fn send_statement(&mut self, ctx: &mut Context, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) { // TODO: something more targeted than gossip. - let raw = ::serde_json::to_vec(&Message::Statement(parent_hash, statement)) - .expect("message serialization infallible; qed"); - + let raw = Message::Statement(parent_hash, statement).encode(); self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash); } @@ -427,7 +469,7 @@ impl Specialization for PolkadotProtocol { ); } - let validator = status.roles.iter().any(|r| *r == message::Role::Authority); + let validator = status.roles.contains(substrate_network::Roles::AUTHORITY); let send_key = validator || local_status.collating_for.is_some(); self.peers.insert(peer_id, PeerInfo { @@ -436,7 +478,7 @@ impl Specialization for PolkadotProtocol { validator, }); - self.consensus_gossip.new_peer(ctx, peer_id, &status.roles); + self.consensus_gossip.new_peer(ctx, peer_id, status.roles); if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { send_polkadot_message( ctx, @@ -497,11 +539,11 @@ impl Specialization for PolkadotProtocol { self.consensus_gossip.on_bft_message(ctx, peer_id, msg) } generic_message::Message::ChainSpecific(raw) => { - match serde_json::from_slice(&raw) { - Ok(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg), - Err(e) => { - trace!(target: "p_net", "Bad message from {}: {}", peer_id, e); - ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason"); + match Message::decode(&mut raw.as_slice()) { + Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg), + None => { + trace!(target: "p_net", "Bad message from {}", peer_id); + ctx.disable_peer(peer_id, "Invalid polkadot protocol message format"); } } } diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index deec3b8129d0a..06d679dcd0eb5 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -24,7 +24,7 @@ use polkadot_primitives::{Block, Hash, SessionKey}; use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData}; use substrate_primitives::H512; use codec::Encode; -use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage}; +use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage}; use std::sync::Arc; use futures::Future; @@ -62,7 +62,7 @@ impl TestContext { fn has_message(&self, to: PeerId, message: Message) -> bool { use substrate_network::generic_message::Message as GenericMessage; - let encoded = ::serde_json::to_vec(&message).unwrap(); + let encoded = message.encode(); self.messages.iter().any(|&(ref peer, ref msg)| match msg { GenericMessage::ChainSpecific(ref data) => peer == &to && data == &encoded, _ => false, @@ -70,7 +70,7 @@ impl TestContext { } } -fn make_status(status: &Status, roles: Vec) -> FullStatus { +fn make_status(status: &Status, roles: Roles) -> FullStatus { FullStatus { version: 1, roles, @@ -78,9 +78,6 @@ fn make_status(status: &Status, roles: Vec) -> FullStatus { best_hash: Default::default(), genesis_hash: Default::default(), chain_status: status.encode(), - parachain_id: None, - validator_id: None, - validator_signature: None, } } @@ -97,7 +94,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus } fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) { - let encoded = ::serde_json::to_vec(&message).unwrap(); + let encoded = message.encode(); protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded)); } @@ -115,7 +112,7 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, Roles::AUTHORITY)); assert!(ctx.messages.is_empty()); } @@ -129,7 +126,7 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![])); + protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, Roles::NONE)); assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key))); } } @@ -171,7 +168,7 @@ fn fetches_from_those_with_knowledge() { // connect peer A { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY)); assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key))); } @@ -187,7 +184,7 @@ fn fetches_from_those_with_knowledge() { // peer B connects and sends session key. request already assigned to A { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY)); on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key)); assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash))); @@ -220,7 +217,7 @@ fn remove_bad_collator() { { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![])); + protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE)); } { diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 31ba646328511..ff7580067e101 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -224,6 +224,22 @@ pub struct Collation { pub receipt: CandidateReceipt, } +impl Decode for Collation { + fn decode(input: &mut I) -> Option { + Some(Collation { + block_data: Decode::decode(input)?, + receipt: Decode::decode(input)?, + }) + } +} + +impl Encode for Collation { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.block_data); + dest.push(&self.receipt); + } +} + /// Parachain ingress queue message. #[derive(PartialEq, Eq, Clone)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] @@ -253,6 +269,18 @@ impl BlockData { } } +impl Decode for BlockData { + fn decode(input: &mut I) -> Option { + Some(BlockData(Decode::decode(input)?)) + } +} + +impl Encode for BlockData { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.0); + } +} + /// Parachain header raw bytes wrapper type. #[derive(PartialEq, Eq)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index a9952261a082e..ed650ae680b17 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -52,7 +52,7 @@ use client::Client; use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork}; use tokio::runtime::TaskExecutor; -pub use service::{Configuration, Role, PruningMode, ExtrinsicPoolOptions, +pub use service::{Configuration, Roles, PruningMode, ExtrinsicPoolOptions, ErrorKind, Error, ComponentBlock, LightComponents, FullComponents}; pub use client::ExecutionStrategy; @@ -166,7 +166,7 @@ pub fn new_light(config: Configuration, executor: TaskExecutor) pub fn new_full(config: Configuration, executor: TaskExecutor) -> Result>, Error> { - let is_validator = (config.roles & Role::AUTHORITY) == Role::AUTHORITY; + let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY; let service = service::Service::>::new(config, executor.clone())?; // Spin consensus service if configured let consensus = if is_validator { diff --git a/polkadot/statement-table/Cargo.toml b/polkadot/statement-table/Cargo.toml index 2e9120a4f0939..b81ee1db907e1 100644 --- a/polkadot/statement-table/Cargo.toml +++ b/polkadot/statement-table/Cargo.toml @@ -7,5 +7,3 @@ authors = ["Parity Technologies "] substrate-codec = { path = "../../substrate/codec" } substrate-primitives = { path = "../../substrate/primitives" } polkadot-primitives = { path = "../primitives" } -serde = "1.0" -serde_derive = "1.0" diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs index 56740288f8e74..06f9f94292799 100644 --- a/polkadot/statement-table/src/generic.rs +++ b/polkadot/statement-table/src/generic.rs @@ -70,7 +70,7 @@ pub trait Context { } /// Statements circulated among peers. -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum Statement { /// Broadcast by a authority to indicate that this is his candidate for /// inclusion. @@ -141,7 +141,7 @@ impl Decode for Statement { } /// A signed statement. -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct SignedStatement { /// The statement. pub statement: Statement, @@ -151,6 +151,23 @@ pub struct SignedStatement { pub sender: V, } +impl Encode for SignedStatement { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.statement); + dest.push(&self.signature); + dest.push(&self.sender); + } +} + +impl Decode for SignedStatement { + fn decode(value: &mut I) -> Option { + Some(SignedStatement { + statement: Decode::decode(value)?, + signature: Decode::decode(value)?, + sender: Decode::decode(value)?, + }) + } +} /// Misbehavior: voting more than one way on candidate validity. /// /// Since there are three possible ways to vote, a double vote is possible in diff --git a/polkadot/statement-table/src/lib.rs b/polkadot/statement-table/src/lib.rs index 779a7fc2df5fd..ecbe832b6a530 100644 --- a/polkadot/statement-table/src/lib.rs +++ b/polkadot/statement-table/src/lib.rs @@ -18,10 +18,6 @@ extern crate substrate_codec as codec; extern crate substrate_primitives; extern crate polkadot_primitives as primitives; -extern crate serde; -#[macro_use] -extern crate serde_derive; - pub mod generic; pub use generic::Table; diff --git a/substrate/network/Cargo.toml b/substrate/network/Cargo.toml index 930f5307c3d39..0185cf49f2216 100644 --- a/substrate/network/Cargo.toml +++ b/substrate/network/Cargo.toml @@ -9,23 +9,16 @@ authors = ["Parity Technologies "] [dependencies] log = "0.3" -rand = "0.3" parking_lot = "0.4" error-chain = "0.12" bitflags = "1.0" -serde = "1.0" -serde_derive = "1.0" -serde_json = "1.0" futures = "0.1.17" linked-hash-map = "0.5" ethcore-io = { git = "https://github.com/paritytech/parity.git" } ed25519 = { path = "../../substrate/ed25519" } substrate-primitives = { path = "../../substrate/primitives" } substrate-client = { path = "../../substrate/client" } -substrate-serializer = { path = "../../substrate/serializer" } -substrate-runtime-support = { path = "../../substrate/runtime-support" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } -substrate-bft = { path = "../../substrate/bft" } substrate-codec = { path = "../../substrate/codec" } substrate-network-libp2p = { path = "../../substrate/network-libp2p" } diff --git a/substrate/network/src/config.rs b/substrate/network/src/config.rs index 7e21a5ded3b02..008b662b87291 100644 --- a/substrate/network/src/config.rs +++ b/substrate/network/src/config.rs @@ -14,19 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? -pub use service::Role; +pub use service::Roles; /// Protocol configuration #[derive(Clone)] pub struct ProtocolConfig { /// Assigned roles. - pub roles: Role, + pub roles: Roles, } impl Default for ProtocolConfig { fn default() -> ProtocolConfig { ProtocolConfig { - roles: Role::FULL, + roles: Roles::FULL, } } } diff --git a/substrate/network/src/consensus_gossip.rs b/substrate/network/src/consensus_gossip.rs index 37925d302a33d..bff296acdeed3 100644 --- a/substrate/network/src/consensus_gossip.rs +++ b/substrate/network/src/consensus_gossip.rs @@ -25,6 +25,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; use protocol::Context; +use service::Roles; // TODO: Add additional spam/DoS attack protection. const MESSAGE_LIFETIME: Duration = Duration::from_secs(600); @@ -73,8 +74,8 @@ impl ConsensusGossip where B::Header: HeaderT { } /// Handle new connected peer. - pub fn new_peer(&mut self, protocol: &mut Context, peer_id: PeerId, roles: &[message::Role]) { - if roles.iter().any(|r| *r == message::Role::Validator) { + pub fn new_peer(&mut self, protocol: &mut Context, peer_id: PeerId, roles: Roles) { + if roles.contains(Roles::AUTHORITY) { trace!(target:"gossip", "Registering authority {}", peer_id); // Send out all known messages. // TODO: limit by size diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index b0e3e7c7e46a0..9319918db15d7 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? +#![warn(unused_extern_crates)] #![warn(missing_docs)] //! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages. @@ -21,21 +22,14 @@ extern crate ethcore_io as core_io; extern crate linked_hash_map; -extern crate rand; extern crate parking_lot; extern crate substrate_primitives as primitives; -extern crate substrate_serializer as ser; extern crate substrate_client as client; -extern crate substrate_runtime_support as runtime_support; extern crate substrate_runtime_primitives as runtime_primitives; extern crate substrate_network_libp2p as network_libp2p; -extern crate substrate_bft; extern crate substrate_codec as codec; -extern crate serde; -extern crate serde_json; extern crate futures; extern crate ed25519; -#[macro_use] extern crate serde_derive; #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[macro_use] extern crate error_chain; @@ -67,5 +61,5 @@ pub use sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, PeerId, ProtocolId, ConnectionFilter, ConnectionDirection}; pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage}; pub use error::Error; -pub use config::{Role, ProtocolConfig}; +pub use config::{Roles, ProtocolConfig}; pub use on_demand::{OnDemand, OnDemandService, RemoteCallResponse}; diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index a0f863c406da4..341dccaa2a2b6 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -17,9 +17,8 @@ //! Network packet message types. These get serialized and put into the lower level protocol payload. use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; -use service::Role as RoleFlags; - -pub use self::generic::{BlockAnnounce, RemoteCallRequest, ConsensusVote, SignedConsensusVote, FromBlock, Body}; +use codec::{Encode, Decode, Input, Output}; +pub use self::generic::{BlockAnnounce, RemoteCallRequest, ConsensusVote, SignedConsensusVote, FromBlock}; /// A unique ID of a request. pub type RequestId = u64; @@ -86,76 +85,34 @@ pub type SignedConsensusMessage = generic::SignedConsensusProposal< /// A set of transactions. pub type Transactions = Vec; -/// Configured node role. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -pub enum Role { - /// Full node with no additional responsibilities. - Full, - /// Light client. - Light, - /// Parachain validator. - Authority, - /// Same as `Authority` - Validator, -} - -impl Role { - /// Convert enum to service flags. - pub fn as_flags(roles: &[Role]) -> RoleFlags { - let mut flags = RoleFlags::NONE; - for r in roles { - match *r { - Role::Full => flags = flags | RoleFlags::FULL, - Role::Light => flags = flags | RoleFlags::LIGHT, - Role::Authority | Role::Validator => flags = flags | RoleFlags::AUTHORITY, - } - } - flags - } -} - -impl From for Vec where { - fn from(flags: RoleFlags) -> Vec { - let mut roles = Vec::new(); - if !(flags & RoleFlags::FULL).is_empty() { - roles.push(Role::Full); - } - if !(flags & RoleFlags::LIGHT).is_empty() { - roles.push(Role::Light); - } - if !(flags & RoleFlags::AUTHORITY).is_empty() { - roles.push(Role::Validator); - } - roles - } -} - /// Bits of block data and associated artefacts to request. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Copy, Clone)] -pub enum BlockAttribute { - /// Include block header. - Header, - /// Include block body. - Body, - /// Include block receipt. - Receipt, - /// Include block message queue. - MessageQueue, - /// Include a justification for the block. - Justification, +bitflags! { + /// Node roles bitmask. + pub struct BlockAttributes: u8 { + /// Include block header. + const HEADER = 0b00000001; + /// Include block body. + const BODY = 0b00000010; + /// Include block receipt. + const RECEIPT = 0b00000100; + /// Include block message queue. + const MESSAGE_QUEUE = 0b00001000; + /// Include a justification for the block. + const JUSTIFICATION = 0b00010000; + } } -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] /// Block enumeration direction. pub enum Direction { /// Enumerate in ascending order (from child to parent). - Ascending, + Ascending = 0, /// Enumerate in descendfing order (from parent to canonical child). - Descending, + Descending = 1, } /// Remote call response. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct RemoteCallResponse { /// Id of a request this response was made for. pub id: RequestId, @@ -163,106 +120,75 @@ pub struct RemoteCallResponse { pub proof: Vec>, } +impl Encode for RemoteCallResponse { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.proof); + } +} + +impl Decode for RemoteCallResponse { + fn decode(input: &mut I) -> Option { + Some(RemoteCallResponse { + id: Decode::decode(input)?, + proof: Decode::decode(input)?, + }) + } +} + /// Generic types. pub mod generic { use primitives::AuthorityId; - use codec::{Codec, Decode, Encode}; + use codec::{Decode, Encode, Input, Output}; use runtime_primitives::bft::Justification; use ed25519; - use primitives::Signature; - - use super::{Role, BlockAttribute, RemoteCallResponse, RequestId, Transactions, Direction}; - - use primitives::bytes; - - /// Emulates Poc-1 extrinsic primitive. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] - pub struct V1Extrinsic(#[serde(with="bytes")] pub Vec); - // Alternative block format for poc-1 compatibility. - // TODO: remove this after poc-2 - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] - #[serde(untagged)] - /// Serialized block body type. - pub enum Body { - /// Poc-1. Extrinsics as bytes. - V1(Vec), - /// Poc-2 or later. A structured type. - Extrinsics(Vec), - } - - impl Body where Extrinsic: Codec { - /// Extracts extrinsic from the body. - pub fn to_extrinsics(self) -> Vec { - match self { - Body::Extrinsics(e) => e, - Body::V1(e) => { - e.into_iter().filter_map(|bytes| { - let bytes = bytes.0.encode(); - Decode::decode(&mut bytes.as_slice()) - }).collect() - } - } - } - } + use service::Roles; + use super::{BlockAttributes, RemoteCallResponse, RequestId, Transactions, Direction}; - /// Emulates Poc-1 justification format. - #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] - pub struct V1Justification { - /// The round consensus was reached in. - pub round_number: u32, - /// The hash of the header justified. - pub hash: H, - /// The signatures and signers of the hash. - pub signatures: Vec<([u8; 32], Signature)> - } - - // TODO: remove this after poc-2 - /// Justification back compat - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] - #[serde(untagged)] - pub enum BlockJustification { - /// Poc-1 format. - V1(V1Justification), - /// Poc-2 format. - V2(Justification), - } - - impl BlockJustification { - /// Convert to PoC-2 justification format. - pub fn to_justification(self) -> Justification { - match self { - BlockJustification::V2(j) => j, - BlockJustification::V1(j) => { - Justification { - round_number: j.round_number, - hash: j.hash, - signatures: j.signatures.into_iter().map(|(a, s)| (a.into(), s)).collect(), - } - } - } - } - } /// Block data sent in the response. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct BlockData { /// Block header hash. pub hash: Hash, /// Block header if requested. pub header: Option

, /// Block body if requested. - pub body: Option>, + pub body: Option>, /// Block receipt if requested. pub receipt: Option>, /// Block message queue if requested. pub message_queue: Option>, /// Justification if requested. - pub justification: Option>, + pub justification: Option>, + } + + impl Encode for BlockData { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.hash); + dest.push(&self.header); + dest.push(&self.body); + dest.push(&self.receipt); + dest.push(&self.message_queue); + dest.push(&self.justification); + } + } + + impl Decode for BlockData { + fn decode(input: &mut I) -> Option { + Some(BlockData { + hash: Decode::decode(input)?, + header: Decode::decode(input)?, + body: Decode::decode(input)?, + receipt: Decode::decode(input)?, + message_queue: Decode::decode(input)?, + justification: Decode::decode(input)?, + }) + } } /// Identifies starting point of a block sequence. - #[serde(untagged)] - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub enum FromBlock { /// Start with given hash. Hash(Hash), @@ -270,8 +196,33 @@ pub mod generic { Number(Number), } + impl Encode for FromBlock { + fn encode_to(&self, dest: &mut T) { + match *self { + FromBlock::Hash(ref h) => { + dest.push_byte(0); + dest.push(h); + } + FromBlock::Number(ref n) => { + dest.push_byte(1); + dest.push(n); + } + } + } + } + + impl Decode for FromBlock { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + 0 => Some(FromBlock::Hash(Decode::decode(input)?)), + 1 => Some(FromBlock::Number(Decode::decode(input)?)), + _ => None, + } + } + } + /// Communication that can occur between participants in consensus. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub enum BftMessage { /// A consensus message (proposal or vote) Consensus(SignedConsensusMessage), @@ -279,8 +230,33 @@ pub mod generic { Auxiliary(Justification), } + impl Encode for BftMessage { + fn encode_to(&self, dest: &mut T) { + match *self { + BftMessage::Consensus(ref h) => { + dest.push_byte(0); + dest.push(h); + } + BftMessage::Auxiliary(ref n) => { + dest.push_byte(1); + dest.push(n); + } + } + } + } + + impl Decode for BftMessage { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + 0 => Some(BftMessage::Consensus(Decode::decode(input)?)), + 1 => Some(BftMessage::Auxiliary(Decode::decode(input)?)), + _ => None, + } + } + } + /// BFT Consensus message with parent header hash attached to it. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct LocalizedBftMessage { /// Consensus message. pub message: BftMessage, @@ -288,8 +264,24 @@ pub mod generic { pub parent_hash: Hash, } + impl Encode for LocalizedBftMessage { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.message); + dest.push(&self.parent_hash); + } + } + + impl Decode for LocalizedBftMessage { + fn decode(input: &mut I) -> Option { + Some(LocalizedBftMessage { + message: Decode::decode(input)?, + parent_hash: Decode::decode(input)?, + }) + } + } + /// A localized proposal message. Contains two signed pieces of data. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct SignedConsensusProposal { /// The round number. pub round_number: u32, @@ -305,8 +297,32 @@ pub mod generic { pub full_signature: ed25519::Signature, } + impl Encode for SignedConsensusProposal { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.round_number); + dest.push(&self.proposal); + dest.push(&self.digest); + dest.push(&self.sender); + dest.push(&self.digest_signature); + dest.push(&self.full_signature); + } + } + + impl Decode for SignedConsensusProposal { + fn decode(input: &mut I) -> Option { + Some(SignedConsensusProposal { + round_number: Decode::decode(input)?, + proposal: Decode::decode(input)?, + digest: Decode::decode(input)?, + sender: Decode::decode(input)?, + digest_signature: Decode::decode(input)?, + full_signature: Decode::decode(input)?, + }) + } + } + /// A localized vote message, including the sender. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct SignedConsensusVote { /// The message sent. pub vote: ConsensusVote, @@ -316,8 +332,26 @@ pub mod generic { pub signature: ed25519::Signature, } + impl Encode for SignedConsensusVote { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.vote); + dest.push(&self.sender); + dest.push(&self.signature); + } + } + + impl Decode for SignedConsensusVote { + fn decode(input: &mut I) -> Option { + Some(SignedConsensusVote { + vote: Decode::decode(input)?, + sender: Decode::decode(input)?, + signature: Decode::decode(input)?, + }) + } + } + /// Votes during a consensus round. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub enum ConsensusVote { /// Prepare to vote for proposal with digest D. Prepare(u32, H), @@ -327,8 +361,40 @@ pub mod generic { AdvanceRound(u32), } + impl Encode for ConsensusVote { + fn encode_to(&self, dest: &mut T) { + match *self { + ConsensusVote::Prepare(ref r, ref h) => { + dest.push_byte(0); + dest.push(r); + dest.push(h); + } + ConsensusVote::Commit(ref r, ref h) => { + dest.push_byte(1); + dest.push(r); + dest.push(h); + } + ConsensusVote::AdvanceRound(ref r) => { + dest.push_byte(2); + dest.push(r); + } + } + } + } + + impl Decode for ConsensusVote { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + 0 => Some(ConsensusVote::Prepare(Decode::decode(input)?, Decode::decode(input)?)), + 1 => Some(ConsensusVote::Commit(Decode::decode(input)?, Decode::decode(input)?)), + 2 => Some(ConsensusVote::AdvanceRound(Decode::decode(input)?)), + _ => None, + } + } + } + /// A localized message. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub enum SignedConsensusMessage { /// A proposal. Propose(SignedConsensusProposal), @@ -336,8 +402,33 @@ pub mod generic { Vote(SignedConsensusVote), } + impl Encode for SignedConsensusMessage { + fn encode_to(&self, dest: &mut T) { + match *self { + SignedConsensusMessage::Propose(ref m) => { + dest.push_byte(0); + dest.push(m); + } + SignedConsensusMessage::Vote(ref m) => { + dest.push_byte(1); + dest.push(m); + } + } + } + } + + impl Decode for SignedConsensusMessage { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + 0 => Some(SignedConsensusMessage::Propose(Decode::decode(input)?)), + 1 => Some(SignedConsensusMessage::Vote(Decode::decode(input)?)), + _ => None, + } + } + } + /// A network message. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub enum Message { /// Status packet. Status(Status), @@ -359,13 +450,77 @@ pub mod generic { ChainSpecific(Vec), } + impl Encode + for Message + { + fn encode_to(&self, dest: &mut T) { + match *self { + Message::Status(ref m) => { + dest.push_byte(0); + dest.push(m); + } + Message::BlockRequest(ref m) => { + dest.push_byte(1); + dest.push(m); + } + Message::BlockResponse(ref m) => { + dest.push_byte(2); + dest.push(m); + } + Message::BlockAnnounce(ref m) => { + dest.push_byte(3); + dest.push(m); + } + Message::Transactions(ref m) => { + dest.push_byte(4); + dest.push(m); + } + Message::BftMessage(ref m) => { + dest.push_byte(5); + dest.push(m); + } + Message::RemoteCallRequest(ref m) => { + dest.push_byte(6); + dest.push(m); + } + Message::RemoteCallResponse(ref m) => { + dest.push_byte(7); + dest.push(m); + } + Message::ChainSpecific(ref m) => { + dest.push_byte(255); + dest.push(m); + } + } + } + } + + impl Decode + for Message + { + fn decode(input: &mut I) -> Option { + match input.read_byte()? { + 0 => Some(Message::Status(Decode::decode(input)?)), + 1 => Some(Message::BlockRequest(Decode::decode(input)?)), + 2 => Some(Message::BlockResponse(Decode::decode(input)?)), + 3 => Some(Message::BlockAnnounce(Decode::decode(input)?)), + 4 => Some(Message::Transactions(Decode::decode(input)?)), + 5 => Some(Message::BftMessage(Decode::decode(input)?)), + 6 => Some(Message::RemoteCallResponse(Decode::decode(input)?)), + 7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)), + 255 => Some(Message::ChainSpecific(Decode::decode(input)?)), + _ => None, + } + } + } + /// Status sent on connection. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct Status { /// Protocol version. pub version: u32, /// Supported roles. - pub roles: Vec, + pub roles: Roles, /// Best block number. pub best_number: Number, /// Best block hash. @@ -373,23 +528,40 @@ pub mod generic { /// Genesis block hash. pub genesis_hash: Hash, /// Chain-specific status. - #[serde(skip)] pub chain_status: Vec, - /// Signatue of `best_hash` made with validator address. Required for the validator role. - pub validator_signature: Option, - /// Validator address. Required for the validator role. - pub validator_id: Option, - /// Parachain id. Required for the collator role. - pub parachain_id: Option, } + impl Encode for Status { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.version); + dest.push_byte(self.roles.bits()); + dest.push(&self.best_number); + dest.push(&self.best_hash); + dest.push(&self.genesis_hash); + dest.push(&self.chain_status); + } + } + + impl Decode for Status { + fn decode(input: &mut I) -> Option { + Some(Status { + version: Decode::decode(input)?, + roles: Roles::from_bits(input.read_byte()?)?, + best_number: Decode::decode(input)?, + best_hash: Decode::decode(input)?, + genesis_hash: Decode::decode(input)?, + chain_status: Decode::decode(input)?, + }) + } + } + /// Request block data from a peer. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct BlockRequest { /// Unique request id. pub id: RequestId, /// Bits of block data to request. - pub fields: Vec, + pub fields: BlockAttributes, /// Start from this block. pub from: FromBlock, /// End at this block. An implementation defined maximum is used when unspecified. @@ -400,8 +572,36 @@ pub mod generic { pub max: Option, } + impl Encode for BlockRequest { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push_byte(self.fields.bits()); + dest.push(&self.from); + dest.push(&self.to); + dest.push_byte(self.direction as u8); + dest.push(&self.max); + } + } + + impl Decode for BlockRequest { + fn decode(input: &mut I) -> Option { + Some(BlockRequest { + id: Decode::decode(input)?, + fields: BlockAttributes::from_bits(input.read_byte()?)?, + from: Decode::decode(input)?, + to: Decode::decode(input)?, + direction: match input.read_byte()? { + x if x == Direction::Ascending as u8 => Some(Direction::Ascending), + x if x == Direction::Descending as u8 => Some(Direction::Descending), + _ => None, + }?, + max: Decode::decode(input)?, + }) + } + } + /// Response to `BlockRequest` - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct BlockResponse { /// Id of a request this response was made for. pub id: RequestId, @@ -409,14 +609,44 @@ pub mod generic { pub blocks: Vec>, } + impl Encode for BlockResponse { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.blocks) + } + } + + impl Decode for BlockResponse { + fn decode(input: &mut I) -> Option { + Some(BlockResponse { + id: Decode::decode(input)?, + blocks: Decode::decode(input)?, + }) + } + } + /// Announce a new complete relay chain block on the network. - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + #[derive(Debug, PartialEq, Eq, Clone)] pub struct BlockAnnounce { /// New block header. pub header: H, } - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] + impl Encode for BlockAnnounce
{ + fn encode_to(&self, dest: &mut T) { + dest.push(&self.header); + } + } + + impl Decode for BlockAnnounce
{ + fn decode(input: &mut I) -> Option { + Some(BlockAnnounce { + header: Decode::decode(input)?, + }) + } + } + + #[derive(Debug, PartialEq, Eq, Clone)] /// Remote call request. pub struct RemoteCallRequest { /// Unique request id. @@ -428,4 +658,24 @@ pub mod generic { /// Call data. pub data: Vec, } + + impl Encode for RemoteCallRequest { + fn encode_to(&self, dest: &mut T) { + dest.push(&self.id); + dest.push(&self.block); + dest.push(self.method.as_bytes()); + dest.push(&self.data); + } + } + + impl Decode for RemoteCallRequest { + fn decode(input: &mut I) -> Option { + Some(RemoteCallRequest { + id: Decode::decode(input)?, + block: Decode::decode(input)?, + method: String::from_utf8_lossy(&Vec::decode(input)?).into(), + data: Decode::decode(input)?, + }) + } + } } diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs index 2011462622feb..da022e91b1219 100644 --- a/substrate/network/src/on_demand.rs +++ b/substrate/network/src/on_demand.rs @@ -38,7 +38,7 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); /// On-demand service API. pub trait OnDemandService: Send + Sync { /// When new node is connected. - fn on_connect(&self, peer: PeerId, role: service::Role); + fn on_connect(&self, peer: PeerId, role: service::Roles); /// When node is disconnected. fn on_disconnect(&self, peer: PeerId); @@ -168,8 +168,8 @@ impl OnDemandService for OnDemand where E: service::ExecuteInContext, B::Header: HeaderT, { - fn on_connect(&self, peer: PeerId, role: service::Role) { - if !role.intersects(service::Role::FULL | service::Role::AUTHORITY) { // TODO: correct? + fn on_connect(&self, peer: PeerId, role: service::Roles) { + if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct? return; } @@ -326,7 +326,7 @@ mod tests { use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; use message; use network_libp2p::PeerId; - use service::{Role, ExecuteInContext}; + use service::{Roles, ExecuteInContext}; use test::TestIo; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; use test_client::runtime::{Block, Hash}; @@ -372,16 +372,16 @@ mod tests { #[test] fn knows_about_peers_roles() { let (_, on_demand) = dummy(true); - on_demand.on_connect(0, Role::LIGHT); - on_demand.on_connect(1, Role::FULL); - on_demand.on_connect(2, Role::AUTHORITY); + on_demand.on_connect(0, Roles::LIGHT); + on_demand.on_connect(1, Roles::FULL); + on_demand.on_connect(2, Roles::AUTHORITY); assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); } #[test] fn disconnects_from_idle_peer() { let (_, on_demand) = dummy(true); - on_demand.on_connect(0, Role::FULL); + on_demand.on_connect(0, Roles::FULL); assert_eq!(1, total_peers(&*on_demand)); on_demand.on_disconnect(0); assert_eq!(0, total_peers(&*on_demand)); @@ -393,8 +393,8 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Role::FULL); - on_demand.on_connect(1, Role::FULL); + on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(1, Roles::FULL); assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert!(on_demand.core.lock().active_peers.is_empty()); @@ -414,7 +414,7 @@ mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Role::FULL); + on_demand.on_connect(0, Roles::FULL); on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); receive_call_response(&*on_demand, &mut network, 0, 1); @@ -429,7 +429,7 @@ mod tests { let mut network = TestIo::new(&queue, None); on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); - on_demand.on_connect(0, Role::FULL); + on_demand.on_connect(0, Roles::FULL); receive_call_response(&*on_demand, &mut network, 0, 0); assert!(network.to_disconnect.contains(&0)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); @@ -440,7 +440,7 @@ mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Role::FULL); + on_demand.on_connect(0, Roles::FULL); receive_call_response(&*on_demand, &mut network, 0, 0); assert!(network.to_disconnect.contains(&0)); @@ -451,7 +451,7 @@ mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Role::FULL); + on_demand.on_connect(0, Roles::FULL); let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); let thread = ::std::thread::spawn(move || { diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index a328c8bb4c663..ff35f6a190def 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -19,16 +19,16 @@ use std::{mem, cmp}; use std::sync::Arc; use std::time; use parking_lot::RwLock; -use serde_json; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As}; use runtime_primitives::generic::BlockId; use network_libp2p::PeerId; +use codec::{Encode, Decode}; use message::{self, Message}; use message::generic::Message as GenericMessage; use specialization::Specialization; use sync::{ChainSync, Status as SyncStatus, SyncState}; -use service::{Role, TransactionPool}; +use service::{Roles, TransactionPool}; use config::ProtocolConfig; use chain::Client; use on_demand::OnDemandService; @@ -38,7 +38,7 @@ use error; const REQUEST_TIMEOUT_SEC: u64 = 40; /// Current protocol version. -pub (crate) const CURRENT_VERSION: u32 = 0; +pub (crate) const CURRENT_VERSION: u32 = 1; /// Current packet count. pub (crate) const CURRENT_PACKET_COUNT: u8 = 1; @@ -74,7 +74,7 @@ struct Peer { /// Protocol version protocol_version: u32, /// Roles - roles: Role, + roles: Roles, /// Peer best block hash best_hash: B::Hash, /// Peer best block number @@ -95,7 +95,7 @@ struct Peer { #[derive(Debug)] pub struct PeerInfo { /// Roles - pub roles: Role, + pub roles: Roles, /// Protocol version pub protocol_version: u32, /// Peer best block hash @@ -233,12 +233,12 @@ impl> Protocol { } } - pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) { - let message: Message = match serde_json::from_slice(data) { - Ok(m) => m, - Err(e) => { - trace!(target: "sync", "Invalid packet: {}", String::from_utf8_lossy(data)); - io.disable_peer(peer_id, &format!("Peer sent us a packet with invalid format ({})", e)); + pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, mut data: &[u8]) { + let message: Message = match Decode::decode(&mut data) { + Some(m) => m, + None => { + trace!(target: "sync", "Invalid packet from {}", peer_id); + io.disable_peer(peer_id, "Peer sent us a packet with invalid format"); return; } }; @@ -319,16 +319,9 @@ impl> Protocol { }; let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize; // TODO: receipts, etc. - let (mut get_header, mut get_body, mut get_justification) = (false, false, false); - for a in request.fields { - match a { - message::BlockAttribute::Header => get_header = true, - message::BlockAttribute::Body => get_body = true, - message::BlockAttribute::Receipt => unimplemented!(), - message::BlockAttribute::MessageQueue => unimplemented!(), - message::BlockAttribute::Justification => get_justification = true, - } - } + let get_header = request.fields.contains(message::BlockAttributes::HEADER); + let get_body = request.fields.contains(message::BlockAttributes::BODY); + let get_justification = request.fields.contains(message::BlockAttributes::JUSTIFICATION); while let Some(header) = self.context_data.chain.header(&id).unwrap_or(None) { if blocks.len() >= max{ break; @@ -339,10 +332,10 @@ impl> Protocol { let block_data = message::generic::BlockData { hash: hash, header: if get_header { Some(header) } else { None }, - body: (if get_body { self.context_data.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None }).map(|body| message::Body::Extrinsics(body)), + body: if get_body { self.context_data.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None }, receipt: None, message_queue: None, - justification: justification.map(|j| message::generic::BlockJustification::V2(j)), + justification, }; blocks.push(block_data); match request.direction { @@ -435,7 +428,7 @@ impl> Protocol { let peer = Peer { protocol_version: status.version, - roles: message::Role::as_flags(&status.roles), + roles: status.roles, best_hash: status.best_hash, best_number: status.best_number, block_request: None, @@ -452,7 +445,7 @@ impl> Protocol { let mut context = ProtocolContext::new(&self.context_data, io); self.sync.write().new_peer(&mut context, peer_id); self.specialization.write().on_connect(&mut context, peer_id, status.clone()); - self.on_demand.as_ref().map(|s| s.on_connect(peer_id, message::Role::as_flags(&status.roles))); + self.on_demand.as_ref().map(|s| s.on_connect(peer_id, status.roles)); } /// Called when peer sends us new extrinsics @@ -521,10 +514,6 @@ impl> Protocol { best_number: info.chain.best_number, best_hash: info.chain.best_hash, chain_status: self.specialization.read().status(), - - parachain_id: None, - validator_id: None, - validator_signature: None, }; self.send_message(io, peer_id, GenericMessage::Status(status)) } @@ -562,7 +551,7 @@ impl> Protocol { ); // blocks are not announced by light clients - if self.config.roles & Role::LIGHT == Role::LIGHT { + if self.config.roles & Roles::LIGHT == Roles::LIGHT { return; } @@ -621,7 +610,7 @@ fn send_message(peers: &RwLock>>, io: &mut Sy }, _ => (), } - let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed"); + let data = message.encode(); if let Err(e) = io.send(peer_id, data) { debug!(target:"sync", "Error sending message: {:?}", e); io.disconnect_peer(peer_id); @@ -630,6 +619,6 @@ fn send_message(peers: &RwLock>>, io: &mut Sy /// Hash a message. pub(crate) fn hash_message(message: &Message) -> B::Hash { - let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed"); + let data = message.encode(); HashFor::::hash(&data) } diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 8bcd800371835..ea990a304a58c 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -46,7 +46,7 @@ const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000); bitflags! { /// Node roles bitmask. - pub struct Role: u32 { + pub struct Roles: u8 { /// No network. const NONE = 0b00000000; /// Full node, does not participate in consensus. diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index 620ef966b0ccf..a54b920a39693 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -22,7 +22,7 @@ use blocks::{self, BlockCollection}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; -use service::Role; +use service::Roles; // Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; @@ -50,7 +50,7 @@ pub struct ChainSync { blocks: BlockCollection, best_queued_number: NumberFor, best_queued_hash: B::Hash, - required_block_attributes: Vec, + required_block_attributes: message::BlockAttributes, } /// Reported sync state. @@ -73,13 +73,10 @@ pub struct Status { impl ChainSync { /// Create a new instance. - pub(crate) fn new(role: Role, info: &ClientInfo) -> Self { - let mut required_block_attributes = vec![ - message::BlockAttribute::Header, - message::BlockAttribute::Justification - ]; - if role.intersects(Role::FULL) { - required_block_attributes.push(message::BlockAttribute::Body); + pub(crate) fn new(role: Roles, info: &ClientInfo) -> Self { + let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; + if role.intersects(Roles::FULL) { + required_block_attributes |= message::BlockAttributes::BODY; } ChainSync { @@ -88,7 +85,7 @@ impl ChainSync { blocks: BlockCollection::new(), best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), - required_block_attributes: required_block_attributes, + required_block_attributes, } } @@ -253,8 +250,8 @@ impl ChainSync { let result = protocol.client().import( is_best, header, - justification.to_justification(), - block.body.map(|b| b.to_extrinsics()), + justification, + block.body, ); match result { Ok(ImportResult::AlreadyInChain) => { @@ -447,7 +444,7 @@ impl ChainSync { trace!(target: "sync", "Requesting ancestry block #{} from {}", block, peer_id); let request = message::generic::BlockRequest { id: 0, - fields: vec![message::BlockAttribute::Header, message::BlockAttribute::Justification], + fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION, from: message::FromBlock::Number(block), to: None, direction: message::Direction::Ascending, diff --git a/substrate/network/src/test/sync.rs b/substrate/network/src/test/sync.rs index d67d530cce935..7297c239f3968 100644 --- a/substrate/network/src/test/sync.rs +++ b/substrate/network/src/test/sync.rs @@ -17,11 +17,10 @@ use client::backend::Backend; use client::blockchain::HeaderBackend as BlockchainHeaderBackend; use sync::SyncState; -use {Role}; +use Roles; use super::*; #[test] -#[ignore] fn sync_from_two_peers_works() { ::env_logger::init().ok(); let mut net = TestNet::new(3); @@ -34,7 +33,6 @@ fn sync_from_two_peers_works() { } #[test] -#[ignore] fn sync_from_two_peers_with_ancestry_search_works() { ::env_logger::init().ok(); let mut net = TestNet::new(3); @@ -47,7 +45,6 @@ fn sync_from_two_peers_with_ancestry_search_works() { } #[test] -#[ignore] fn sync_long_chain_works() { let mut net = TestNet::new(2); net.peer(1).push_blocks(500, false); @@ -68,7 +65,6 @@ fn sync_no_common_longer_chain_fails() { } #[test] -#[ignore] fn sync_after_fork_works() { ::env_logger::init().ok(); let mut net = TestNet::new(3); @@ -99,7 +95,7 @@ fn blocks_are_not_announced_by_light_nodes() { // full peer0 is connected to light peer // light peer1 is connected to full peer2 let mut light_config = ProtocolConfig::default(); - light_config.roles = Role::LIGHT; + light_config.roles = Roles::LIGHT; net.add_peer(&ProtocolConfig::default()); net.add_peer(&light_config); net.add_peer(&ProtocolConfig::default()); diff --git a/substrate/service/src/config.rs b/substrate/service/src/config.rs index 9f47bf0c441a0..58a3db7052e69 100644 --- a/substrate/service/src/config.rs +++ b/substrate/service/src/config.rs @@ -19,7 +19,7 @@ use extrinsic_pool; use chain_spec::ChainSpec; pub use client::ExecutionStrategy; -pub use network::Role; +pub use network::Roles; pub use network::NetworkConfiguration; pub use client_db::PruningMode; use runtime_primitives::BuildStorage; @@ -28,7 +28,7 @@ use serde::{Serialize, de::DeserializeOwned}; /// Service configuration. pub struct Configuration { /// Node roles. - pub roles: Role, + pub roles: Roles, /// Extrinsic pool configuration. pub extrinsic_pool: extrinsic_pool::txpool::Options, /// Network configuration. @@ -57,7 +57,7 @@ impl Configuration { let mut configuration = Configuration { chain_spec, name: Default::default(), - roles: Role::FULL, + roles: Roles::FULL, extrinsic_pool: Default::default(), network: Default::default(), keystore_path: Default::default(), diff --git a/substrate/service/src/lib.rs b/substrate/service/src/lib.rs index c4c641a9520cd..311b256e1a4bc 100644 --- a/substrate/service/src/lib.rs +++ b/substrate/service/src/lib.rs @@ -59,7 +59,7 @@ use exit_future::Signal; use tokio::runtime::TaskExecutor; pub use self::error::{ErrorKind, Error}; -pub use config::{Configuration, Role, PruningMode}; +pub use config::{Configuration, Roles, PruningMode}; pub use chain_spec::ChainSpec; pub use extrinsic_pool::txpool::{Options as ExtrinsicPoolOptions}; pub use extrinsic_pool::api::{ExtrinsicPool as ExtrinsicPoolApi};