Skip to content

Commit

Permalink
Update to new gossip system. (paritytech#172)
Browse files Browse the repository at this point in the history
* Integrates new gossip system into Polkadot (paritytech#166)

* new gossip validation in network

* integrate new gossip into service

* Fix build

* Fix claims module

* fix warning

* update to latest master again

* update runtime
  • Loading branch information
rphmeier authored and bkchr committed Mar 6, 2019
1 parent 6e9ab77 commit 222c6c2
Show file tree
Hide file tree
Showing 17 changed files with 832 additions and 618 deletions.
652 changes: 288 additions & 364 deletions Cargo.lock

Large diffs are not rendered by default.

169 changes: 169 additions & 0 deletions network/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Gossip messages and the message validator

use substrate_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
};
use polkadot_validation::SignedStatement;
use polkadot_primitives::{Hash, SessionKey};
use codec::Decode;

use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;

use super::NetworkService;

/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: substrate_network::ConsensusEngineId = [b'd', b'o', b't', b'1'];

/// A gossip message.
#[derive(Encode, Decode, Clone)]
pub(crate) struct GossipMessage {
/// The relay chain parent hash.
pub(crate) relay_parent: Hash,
/// The signed statement being gossipped.
pub(crate) statement: SignedStatement,
}

/// whether a block is known.
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}

/// An oracle for known blocks.
pub trait KnownOracle: Send + Sync {
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

impl<F> KnownOracle for F where F: Fn(&Hash) -> Option<Known> + Send + Sync {
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self)(block_hash)
}
}

/// Register a gossip validator on the network service.
///
/// This returns a `RegisteredMessageValidator`
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<O: KnownOracle + 'static>(
service: &NetworkService,
oracle: O,
) -> RegisteredMessageValidator {
let validator = Arc::new(MessageValidator {
live_session: RwLock::new(HashMap::new()),
oracle,
});

let gossip_side = validator.clone();
service.with_gossip(|gossip, _| gossip.register_validator(POLKADOT_ENGINE_ID, gossip_side));

RegisteredMessageValidator { inner: validator as _ }
}

/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<KnownOracle>>,
}

impl RegisteredMessageValidator {
#[cfg(test)]
pub(crate) fn new_test<O: KnownOracle + 'static>(oracle: O) -> Self {
let validator = Arc::new(MessageValidator {
live_session: RwLock::new(HashMap::new()),
oracle,
});

RegisteredMessageValidator { inner: validator as _ }
}

/// Note a live attestation session. This must be removed later with
/// `remove_session`.
pub(crate) fn note_session(&self, relay_parent: Hash, validation: MessageValidationData) {
self.inner.live_session.write().insert(relay_parent, validation);
}

/// Remove a live attestation session when it is no longer live.
pub(crate) fn remove_session(&self, relay_parent: &Hash) {
self.inner.live_session.write().remove(relay_parent);
}
}

// data needed for validating gossip.
pub(crate) struct MessageValidationData {
/// The authorities at a block.
pub(crate) authorities: Vec<SessionKey>,
}

impl MessageValidationData {
fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> bool {
self.authorities.contains(&statement.sender) &&
::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
relay_parent,
)
}
}

/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<O: ?Sized> {
live_session: RwLock<HashMap<Hash, MessageValidationData>>,
oracle: O,
}

impl<O: KnownOracle + ?Sized> network_gossip::Validator<Hash> for MessageValidator<O> {
fn validate(&self, mut data: &[u8]) -> GossipValidationResult<Hash> {
match GossipMessage::decode(&mut data) {
Some(GossipMessage { relay_parent, statement }) => {
let live = self.live_session.read();
let topic = || ::router::attestation_topic(relay_parent.clone());
if let Some(validation) = live.get(&relay_parent) {
if validation.check_statement(&relay_parent, &statement) {
GossipValidationResult::Valid(topic())
} else {
GossipValidationResult::Invalid
}
} else {
match self.oracle.is_known(&relay_parent) {
None | Some(Known::Leaf) => GossipValidationResult::Future(topic()),
Some(Known::Old) => GossipValidationResult::Expired,
Some(Known::Bad) => GossipValidationResult::Invalid,
}
}
}
None => {
debug!(target: "validation", "Error decoding gossip message");
GossipValidationResult::Invalid
}
}
}
}
1 change: 1 addition & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod collator_pool;
mod local_collations;
mod router;
pub mod validation;
pub mod gossip;

use codec::{Decode, Encode};
use futures::sync::oneshot;
Expand Down
9 changes: 8 additions & 1 deletion network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::{io, mem};
use std::sync::Arc;

use gossip::{RegisteredMessageValidator};
use validation::{NetworkService, Knowledge, Executor};

type IngressPair = (ParaId, Vec<Message>);
type IngressPairRef<'a> = (ParaId, &'a [Message]);

fn attestation_topic(parent_hash: Hash) -> Hash {
/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");

Expand Down Expand Up @@ -124,6 +126,7 @@ pub struct Router<P, E, N: NetworkService, T> {
knowledge: Arc<Mutex<Knowledge>>,
fetch_incoming: Arc<Mutex<HashMap<ParaId, IncomingReceiver>>>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator,
}

impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
Expand All @@ -135,6 +138,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
exit: E,
message_validator: RegisteredMessageValidator,
) -> Self {
Router {
table,
Expand All @@ -147,6 +151,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
fetch_incoming: Arc::new(Mutex::new(HashMap::new())),
deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())),
exit,
message_validator,
}
}

Expand All @@ -169,6 +174,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
fetch_incoming: self.fetch_incoming.clone(),
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
}
}
}
Expand Down Expand Up @@ -392,6 +398,7 @@ impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
));
}
}
self.message_validator.remove_session(&parent_hash);
}
}

Expand Down
1 change: 1 addition & 0 deletions network/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl TestContext {
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
min_supported_version: 1,
roles,
best_number: 0,
best_hash: Default::default(),
Expand Down
47 changes: 31 additions & 16 deletions network/src/tests/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Tests and helpers for validation networking.

use validation::NetworkService;
use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext};
use substrate_network::Context as NetContext;
use substrate_primitives::{Ed25519AuthorityId, NativeOrEncoded};
use substrate_keyring::Keyring;
use {PolkadotProtocol};
Expand Down Expand Up @@ -51,21 +51,21 @@ impl Future for NeverExit {
}

struct GossipRouter {
incoming_messages: mpsc::UnboundedReceiver<(Hash, ConsensusMessage)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
messages: Vec<(Hash, ConsensusMessage)>,
incoming_messages: mpsc::UnboundedReceiver<(Hash, Vec<u8>)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
messages: Vec<(Hash, Vec<u8>)>,
}

impl GossipRouter {
fn add_message(&mut self, topic: Hash, message: ConsensusMessage) {
fn add_message(&mut self, topic: Hash, message: Vec<u8>) {
self.outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(message.clone()).is_ok()
});
self.messages.push((topic, message));
}

fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<ConsensusMessage>) {
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<Vec<u8>>) {
for message in self.messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| msg.clone())
Expand Down Expand Up @@ -105,8 +105,8 @@ impl Future for GossipRouter {

#[derive(Clone)]
struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, ConsensusMessage)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
send_message: mpsc::UnboundedSender<(Hash, Vec<u8>)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
}

fn make_gossip() -> (GossipRouter, GossipHandle) {
Expand All @@ -130,13 +130,13 @@ struct TestNetwork {
}

impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx));
rx
}

fn gossip_message(&self, topic: Hash, message: ConsensusMessage) {
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
let _ = self.gossip.send_message.unbounded_send((topic, message));
}

Expand Down Expand Up @@ -322,9 +322,14 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
gossip: gossip_handle.clone(),
});

let message_val = crate::gossip::RegisteredMessageValidator::new_test(
|_hash: &_| Some(crate::gossip::Known::Leaf),
);

TestValidationNetwork::new(
net,
NeverExit,
message_val,
runtime_api.clone(),
executor.clone(),
)
Expand Down Expand Up @@ -427,30 +432,40 @@ fn ingress_fetch_works() {
let parent_hash = [1; 32].into();

let (router_a, router_b, router_c) = {
let validators: Vec<Hash> = vec![
key_a.to_raw_public().into(),
key_b.to_raw_public().into(),
key_c.to_raw_public().into(),
];

let authorities: Vec<_> = validators.iter().cloned()
.map(|h| h.to_fixed_bytes())
.map(Ed25519AuthorityId)
.collect();

let mut api_handle = built.api_handle.lock();
*api_handle = ApiData {
active_parachains: vec![id_a, id_b, id_c],
duties: vec![Chain::Parachain(id_a), Chain::Parachain(id_b), Chain::Parachain(id_c)],
validators: vec![
key_a.to_raw_public().into(),
key_b.to_raw_public().into(),
key_c.to_raw_public().into(),
],
validators,
ingress,
};

(
built.networks[0].communication_for(
make_table(&*api_handle, &key_a, parent_hash),
vec![MessagesFrom::from_messages(id_a, messages_from_a)],
&authorities,
),
built.networks[1].communication_for(
make_table(&*api_handle, &key_b, parent_hash),
vec![MessagesFrom::from_messages(id_b, messages_from_b)],
&authorities,
),
built.networks[2].communication_for(
make_table(&*api_handle, &key_c, parent_hash),
vec![MessagesFrom::from_messages(id_c, messages_from_c)],
&authorities,
),
)
};
Expand Down
Loading

0 comments on commit 222c6c2

Please sign in to comment.