Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[v0.3] Integrates new gossip system into Polkadot #166

Merged
merged 2 commits into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 63 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

61 changes: 38 additions & 23 deletions network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
//! This fulfills the `polkadot_consensus::Network` trait, providing a hook to be called
//! each time consensus begins on a new chain head.

use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::consensus_gossip::ConsensusMessage;
use polkadot_consensus::{Network, SharedTable, Collators, Statement, GenericStatement};
use sr_primitives::traits::{BlakeTwo256, Hash as HashT, ProvideRuntimeApi};
use polkadot_consensus::{
Network, SharedTable, Collators, Statement, GenericStatement,
};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData};
use codec::Decode;
Expand All @@ -39,12 +40,20 @@ use parking_lot::Mutex;

use super::NetworkService;
use router::Router;
use gossip::{POLKADOT_ENGINE_ID, GossipMessage, RegisteredMessageValidator, MessageValidationData};

/// Compute the gossip topic used for attestations on this relay parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");

BlakeTwo256::hash(&v[..])
}

// task that processes all gossipped consensus messages,
// checking signatures
struct MessageProcessTask<P, E> {
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
parent_hash: Hash,
inner_stream: mpsc::UnboundedReceiver<Vec<u8>>,
table_router: Router<P>,
exit: E,
}
Expand All @@ -54,19 +63,12 @@ impl<P, E> MessageProcessTask<P, E> where
P::Api: ParachainHost<Block>,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
use polkadot_consensus::SignedStatement;

fn process_message(&self, msg: Vec<u8>) -> Option<Async<()>> {
debug!(target: "consensus", "Processing consensus statement for live consensus");
if let Some(statement) = SignedStatement::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
&self.parent_hash
) {
self.table_router.import_statement(statement, self.exit.clone());
}

// statements are already checked by gossip validator.
if let Some(message) = GossipMessage::decode(&mut &msg[..]) {
self.table_router.import_statement(message.statement, self.exit.clone());
}

None
Expand Down Expand Up @@ -99,13 +101,19 @@ impl<P, E> Future for MessageProcessTask<P, E> where
pub struct ConsensusNetwork<P, E> {
network: Arc<NetworkService>,
api: Arc<P>,
message_validator: RegisteredMessageValidator,
exit: E,
}

impl<P, E> ConsensusNetwork<P, E> {
/// Create a new consensus networking object.
pub fn new(network: Arc<NetworkService>, exit: E, api: Arc<P>) -> Self {
ConsensusNetwork { network, exit, api }
pub fn new(
network: Arc<NetworkService>,
exit: E,
message_validator: RegisteredMessageValidator,
api: Arc<P>,
) -> Self {
ConsensusNetwork { network, exit, message_validator, api }
}
}

Expand All @@ -115,6 +123,7 @@ impl<P, E: Clone> Clone for ConsensusNetwork<P, E> {
network: self.network.clone(),
exit: self.exit.clone(),
api: self.api.clone(),
message_validator: self.message_validator.clone(),
}
}
}
Expand All @@ -130,7 +139,7 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
/// Instantiate a table router using the given shared table.
fn communication_for(
&self,
_validators: &[SessionKey],
authorities: &[SessionKey],
table: Arc<SharedTable>,
task_executor: TaskExecutor,
) -> Self::TableRouter {
Expand All @@ -146,14 +155,21 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
task_executor.clone(),
parent_hash,
knowledge.clone(),
self.message_validator.clone(),
);

let attestation_topic = table_router.gossip_topic();
let exit = self.exit.clone();

// before requesting messages, note live consensus session.
self.message_validator.note_consensus(
parent_hash,
MessageValidationData { authorities: authorities.to_vec() },
);

let (tx, rx) = std::sync::mpsc::channel();
self.network.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(attestation_topic);
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, attestation_topic);
let _ = tx.send(inner_rx);
});

Expand All @@ -168,7 +184,6 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
let inner_stream = rx.try_recv().expect("1. The with_gossip closure executed first, 2. the reply should be available");
let process_task = MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router_clone,
exit,
};
Expand Down Expand Up @@ -200,7 +215,7 @@ impl Future for AwaitingCollation {
.map_err(|_| NetworkDown)
}
match self.outer.poll() {
Ok(futures::Async::Ready(mut inner)) => {
Ok(futures::Async::Ready(inner)) => {
self.inner = Some(inner);
self.poll()
},
Expand Down
159 changes: 159 additions & 0 deletions network/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Gossip messages and the message validator

use substrate_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
};
use polkadot_consensus::SignedStatement;
use polkadot_primitives::{Hash, SessionKey};
use codec::Decode;

use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;

use super::NetworkService;

/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: substrate_network::ConsensusEngineId = [b'd', b'o', b't', b'1'];

/// A gossip message.
#[derive(Encode, Decode, Clone)]
pub(crate) struct GossipMessage {
/// The relay chain parent hash.
pub(crate) relay_parent: Hash,
/// The signed statement being gossipped.
pub(crate) statement: SignedStatement,
}

/// whether a block is known.
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}

/// An oracle for known blocks.
pub trait KnownOracle: Send + Sync {
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

impl<F> KnownOracle for F where F: Fn(&Hash) -> Option<Known> + Send + Sync {
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self)(block_hash)
}
}

/// Register a gossip validator on the network service.
///
/// This returns a `RegisteredMessageValidator`
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<O: KnownOracle + 'static>(
service: &NetworkService,
oracle: O,
) -> RegisteredMessageValidator {
let validator = Arc::new(MessageValidator {
live_consensus: RwLock::new(HashMap::new()),
oracle,
});

let gossip_side = validator.clone();
service.with_gossip(|gossip, _| gossip.register_validator(POLKADOT_ENGINE_ID, gossip_side));

RegisteredMessageValidator { inner: validator as _ }
}

/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<KnownOracle>>,
}

impl RegisteredMessageValidator {
/// Note a live consensus session. This must be removed later with
/// `remove_consensus`.
pub(crate) fn note_consensus(&self, relay_parent: Hash, validation: MessageValidationData) {
self.inner.live_consensus.write().insert(relay_parent, validation);
}

/// Remove a live consensus session when it is no longer live.
pub(crate) fn remove_consensus(&self, relay_parent: &Hash) {
self.inner.live_consensus.write().remove(relay_parent);
}
}

// data needed for validating gossip.
pub(crate) struct MessageValidationData {
/// The authorities at a block.
pub(crate) authorities: Vec<SessionKey>,
}

impl MessageValidationData {
fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> bool {
self.authorities.contains(&statement.sender) &&
::polkadot_consensus::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
relay_parent,
)
}
}

/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<O: ?Sized> {
live_consensus: RwLock<HashMap<Hash, MessageValidationData>>,
oracle: O,
}

impl<O: KnownOracle + ?Sized> network_gossip::Validator<Hash> for MessageValidator<O> {
fn validate(&self, mut data: &[u8]) -> GossipValidationResult<Hash> {
match GossipMessage::decode(&mut data) {
Some(GossipMessage { relay_parent, statement }) => {
let live = self.live_consensus.read();
let topic = || ::consensus::attestation_topic(relay_parent.clone());
if let Some(validation) = live.get(&relay_parent) {
if validation.check_statement(&relay_parent, &statement) {
GossipValidationResult::Valid(topic())
} else {
GossipValidationResult::Invalid
}
} else {
match self.oracle.is_known(&relay_parent) {
None | Some(Known::Leaf) => GossipValidationResult::Future(topic()),
Some(Known::Old) => GossipValidationResult::Expired,
Some(Known::Bad) => GossipValidationResult::Invalid,
}
}
}
None => {
debug!(target: "consensus", "Error decoding gossip message");
GossipValidationResult::Invalid
}
}
}
}
1 change: 1 addition & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod collator_pool;
mod local_collations;
mod router;
pub mod consensus;
pub mod gossip;

use codec::{Decode, Encode};
use futures::sync::oneshot;
Expand Down
Loading