Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
[v0.3] Integrates new gossip system into Polkadot (#166)
Browse files Browse the repository at this point in the history
* new gossip validation in network

* integrate new gossip into service
  • Loading branch information
rphmeier authored and arkpar committed Mar 4, 2019
1 parent 2dd861f commit 8b07458
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 97 deletions.
121 changes: 63 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

61 changes: 38 additions & 23 deletions network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
//! This fulfills the `polkadot_consensus::Network` trait, providing a hook to be called
//! each time consensus begins on a new chain head.

use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::consensus_gossip::ConsensusMessage;
use polkadot_consensus::{Network, SharedTable, Collators, Statement, GenericStatement};
use sr_primitives::traits::{BlakeTwo256, Hash as HashT, ProvideRuntimeApi};
use polkadot_consensus::{
Network, SharedTable, Collators, Statement, GenericStatement,
};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData};
use codec::Decode;
Expand All @@ -39,12 +40,20 @@ use parking_lot::Mutex;

use super::NetworkService;
use router::Router;
use gossip::{POLKADOT_ENGINE_ID, GossipMessage, RegisteredMessageValidator, MessageValidationData};

/// Compute the gossip topic used for attestations on this relay parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");

BlakeTwo256::hash(&v[..])
}

// task that processes all gossipped consensus messages,
// checking signatures
struct MessageProcessTask<P, E> {
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
parent_hash: Hash,
inner_stream: mpsc::UnboundedReceiver<Vec<u8>>,
table_router: Router<P>,
exit: E,
}
Expand All @@ -54,19 +63,12 @@ impl<P, E> MessageProcessTask<P, E> where
P::Api: ParachainHost<Block>,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
use polkadot_consensus::SignedStatement;

fn process_message(&self, msg: Vec<u8>) -> Option<Async<()>> {
debug!(target: "consensus", "Processing consensus statement for live consensus");
if let Some(statement) = SignedStatement::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
&self.parent_hash
) {
self.table_router.import_statement(statement, self.exit.clone());
}

// statements are already checked by gossip validator.
if let Some(message) = GossipMessage::decode(&mut &msg[..]) {
self.table_router.import_statement(message.statement, self.exit.clone());
}

None
Expand Down Expand Up @@ -99,13 +101,19 @@ impl<P, E> Future for MessageProcessTask<P, E> where
pub struct ConsensusNetwork<P, E> {
network: Arc<NetworkService>,
api: Arc<P>,
message_validator: RegisteredMessageValidator,
exit: E,
}

impl<P, E> ConsensusNetwork<P, E> {
/// Create a new consensus networking object.
pub fn new(network: Arc<NetworkService>, exit: E, api: Arc<P>) -> Self {
ConsensusNetwork { network, exit, api }
pub fn new(
network: Arc<NetworkService>,
exit: E,
message_validator: RegisteredMessageValidator,
api: Arc<P>,
) -> Self {
ConsensusNetwork { network, exit, message_validator, api }
}
}

Expand All @@ -115,6 +123,7 @@ impl<P, E: Clone> Clone for ConsensusNetwork<P, E> {
network: self.network.clone(),
exit: self.exit.clone(),
api: self.api.clone(),
message_validator: self.message_validator.clone(),
}
}
}
Expand All @@ -130,7 +139,7 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
/// Instantiate a table router using the given shared table.
fn communication_for(
&self,
_validators: &[SessionKey],
authorities: &[SessionKey],
table: Arc<SharedTable>,
task_executor: TaskExecutor,
) -> Self::TableRouter {
Expand All @@ -146,14 +155,21 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
task_executor.clone(),
parent_hash,
knowledge.clone(),
self.message_validator.clone(),
);

let attestation_topic = table_router.gossip_topic();
let exit = self.exit.clone();

// before requesting messages, note live consensus session.
self.message_validator.note_consensus(
parent_hash,
MessageValidationData { authorities: authorities.to_vec() },
);

let (tx, rx) = std::sync::mpsc::channel();
self.network.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(attestation_topic);
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, attestation_topic);
let _ = tx.send(inner_rx);
});

Expand All @@ -168,7 +184,6 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
let inner_stream = rx.try_recv().expect("1. The with_gossip closure executed first, 2. the reply should be available");
let process_task = MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router_clone,
exit,
};
Expand Down Expand Up @@ -200,7 +215,7 @@ impl Future for AwaitingCollation {
.map_err(|_| NetworkDown)
}
match self.outer.poll() {
Ok(futures::Async::Ready(mut inner)) => {
Ok(futures::Async::Ready(inner)) => {
self.inner = Some(inner);
self.poll()
},
Expand Down
159 changes: 159 additions & 0 deletions network/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Gossip messages and the message validator

use substrate_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
};
use polkadot_consensus::SignedStatement;
use polkadot_primitives::{Hash, SessionKey};
use codec::Decode;

use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;

use super::NetworkService;

/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: substrate_network::ConsensusEngineId = [b'd', b'o', b't', b'1'];

/// A gossip message.
#[derive(Encode, Decode, Clone)]
pub(crate) struct GossipMessage {
/// The relay chain parent hash.
pub(crate) relay_parent: Hash,
/// The signed statement being gossipped.
pub(crate) statement: SignedStatement,
}

/// whether a block is known.
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}

/// An oracle for known blocks.
pub trait KnownOracle: Send + Sync {
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

impl<F> KnownOracle for F where F: Fn(&Hash) -> Option<Known> + Send + Sync {
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self)(block_hash)
}
}

/// Register a gossip validator on the network service.
///
/// This returns a `RegisteredMessageValidator`
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<O: KnownOracle + 'static>(
service: &NetworkService,
oracle: O,
) -> RegisteredMessageValidator {
let validator = Arc::new(MessageValidator {
live_consensus: RwLock::new(HashMap::new()),
oracle,
});

let gossip_side = validator.clone();
service.with_gossip(|gossip, _| gossip.register_validator(POLKADOT_ENGINE_ID, gossip_side));

RegisteredMessageValidator { inner: validator as _ }
}

/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<KnownOracle>>,
}

impl RegisteredMessageValidator {
/// Note a live consensus session. This must be removed later with
/// `remove_consensus`.
pub(crate) fn note_consensus(&self, relay_parent: Hash, validation: MessageValidationData) {
self.inner.live_consensus.write().insert(relay_parent, validation);
}

/// Remove a live consensus session when it is no longer live.
pub(crate) fn remove_consensus(&self, relay_parent: &Hash) {
self.inner.live_consensus.write().remove(relay_parent);
}
}

// data needed for validating gossip.
pub(crate) struct MessageValidationData {
/// The authorities at a block.
pub(crate) authorities: Vec<SessionKey>,
}

impl MessageValidationData {
fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> bool {
self.authorities.contains(&statement.sender) &&
::polkadot_consensus::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
relay_parent,
)
}
}

/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<O: ?Sized> {
live_consensus: RwLock<HashMap<Hash, MessageValidationData>>,
oracle: O,
}

impl<O: KnownOracle + ?Sized> network_gossip::Validator<Hash> for MessageValidator<O> {
fn validate(&self, mut data: &[u8]) -> GossipValidationResult<Hash> {
match GossipMessage::decode(&mut data) {
Some(GossipMessage { relay_parent, statement }) => {
let live = self.live_consensus.read();
let topic = || ::consensus::attestation_topic(relay_parent.clone());
if let Some(validation) = live.get(&relay_parent) {
if validation.check_statement(&relay_parent, &statement) {
GossipValidationResult::Valid(topic())
} else {
GossipValidationResult::Invalid
}
} else {
match self.oracle.is_known(&relay_parent) {
None | Some(Known::Leaf) => GossipValidationResult::Future(topic()),
Some(Known::Old) => GossipValidationResult::Expired,
Some(Known::Bad) => GossipValidationResult::Invalid,
}
}
}
None => {
debug!(target: "consensus", "Error decoding gossip message");
GossipValidationResult::Invalid
}
}
}
}
1 change: 1 addition & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod collator_pool;
mod local_collations;
mod router;
pub mod consensus;
pub mod gossip;

use codec::{Decode, Encode};
use futures::sync::oneshot;
Expand Down
Loading

0 comments on commit 8b07458

Please sign in to comment.