Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Update to new gossip system. #172

Merged
merged 6 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
652 changes: 288 additions & 364 deletions Cargo.lock

Large diffs are not rendered by default.

169 changes: 169 additions & 0 deletions network/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Gossip messages and the message validator

use substrate_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
};
use polkadot_validation::SignedStatement;
use polkadot_primitives::{Hash, SessionKey};
use codec::Decode;

use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;

use super::NetworkService;

/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: substrate_network::ConsensusEngineId = [b'd', b'o', b't', b'1'];

/// A gossip message.
#[derive(Encode, Decode, Clone)]
pub(crate) struct GossipMessage {
/// The relay chain parent hash.
pub(crate) relay_parent: Hash,
/// The signed statement being gossipped.
pub(crate) statement: SignedStatement,
}

/// whether a block is known.
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}

/// An oracle for known blocks.
pub trait KnownOracle: Send + Sync {
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

impl<F> KnownOracle for F where F: Fn(&Hash) -> Option<Known> + Send + Sync {
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self)(block_hash)
}
}

/// Register a gossip validator on the network service.
///
/// This returns a `RegisteredMessageValidator`
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<O: KnownOracle + 'static>(
service: &NetworkService,
oracle: O,
) -> RegisteredMessageValidator {
let validator = Arc::new(MessageValidator {
live_session: RwLock::new(HashMap::new()),
oracle,
});

let gossip_side = validator.clone();
service.with_gossip(|gossip, _| gossip.register_validator(POLKADOT_ENGINE_ID, gossip_side));

RegisteredMessageValidator { inner: validator as _ }
}

/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<KnownOracle>>,
}

impl RegisteredMessageValidator {
#[cfg(test)]
pub(crate) fn new_test<O: KnownOracle + 'static>(oracle: O) -> Self {
let validator = Arc::new(MessageValidator {
live_session: RwLock::new(HashMap::new()),
oracle,
});

RegisteredMessageValidator { inner: validator as _ }
}

/// Note a live attestation session. This must be removed later with
/// `remove_session`.
pub(crate) fn note_session(&self, relay_parent: Hash, validation: MessageValidationData) {
self.inner.live_session.write().insert(relay_parent, validation);
}

/// Remove a live attestation session when it is no longer live.
pub(crate) fn remove_session(&self, relay_parent: &Hash) {
self.inner.live_session.write().remove(relay_parent);
}
}

// data needed for validating gossip.
pub(crate) struct MessageValidationData {
/// The authorities at a block.
pub(crate) authorities: Vec<SessionKey>,
}

impl MessageValidationData {
fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> bool {
self.authorities.contains(&statement.sender) &&
::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
relay_parent,
)
}
}

/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<O: ?Sized> {
live_session: RwLock<HashMap<Hash, MessageValidationData>>,
oracle: O,
}

impl<O: KnownOracle + ?Sized> network_gossip::Validator<Hash> for MessageValidator<O> {
fn validate(&self, mut data: &[u8]) -> GossipValidationResult<Hash> {
match GossipMessage::decode(&mut data) {
Some(GossipMessage { relay_parent, statement }) => {
let live = self.live_session.read();
let topic = || ::router::attestation_topic(relay_parent.clone());
if let Some(validation) = live.get(&relay_parent) {
if validation.check_statement(&relay_parent, &statement) {
GossipValidationResult::Valid(topic())
} else {
GossipValidationResult::Invalid
}
} else {
match self.oracle.is_known(&relay_parent) {
None | Some(Known::Leaf) => GossipValidationResult::Future(topic()),
Some(Known::Old) => GossipValidationResult::Expired,
Some(Known::Bad) => GossipValidationResult::Invalid,
}
}
}
None => {
debug!(target: "validation", "Error decoding gossip message");
GossipValidationResult::Invalid
}
}
}
}
1 change: 1 addition & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod collator_pool;
mod local_collations;
mod router;
pub mod validation;
pub mod gossip;

use codec::{Decode, Encode};
use futures::sync::oneshot;
Expand Down
9 changes: 8 additions & 1 deletion network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::{io, mem};
use std::sync::Arc;

use gossip::{RegisteredMessageValidator};
use validation::{NetworkService, Knowledge, Executor};

type IngressPair = (ParaId, Vec<Message>);
type IngressPairRef<'a> = (ParaId, &'a [Message]);

fn attestation_topic(parent_hash: Hash) -> Hash {
/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");

Expand Down Expand Up @@ -124,6 +126,7 @@ pub struct Router<P, E, N: NetworkService, T> {
knowledge: Arc<Mutex<Knowledge>>,
fetch_incoming: Arc<Mutex<HashMap<ParaId, IncomingReceiver>>>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator,
}

impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
Expand All @@ -135,6 +138,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
exit: E,
message_validator: RegisteredMessageValidator,
) -> Self {
Router {
table,
Expand All @@ -147,6 +151,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
fetch_incoming: Arc::new(Mutex::new(HashMap::new())),
deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())),
exit,
message_validator,
}
}

Expand All @@ -169,6 +174,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
fetch_incoming: self.fetch_incoming.clone(),
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
}
}
}
Expand Down Expand Up @@ -392,6 +398,7 @@ impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
));
}
}
self.message_validator.remove_session(&parent_hash);
}
}

Expand Down
1 change: 1 addition & 0 deletions network/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl TestContext {
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
min_supported_version: 1,
roles,
best_number: 0,
best_hash: Default::default(),
Expand Down
47 changes: 31 additions & 16 deletions network/src/tests/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Tests and helpers for validation networking.

use validation::NetworkService;
use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext};
use substrate_network::Context as NetContext;
use substrate_primitives::{Ed25519AuthorityId, NativeOrEncoded};
use substrate_keyring::Keyring;
use {PolkadotProtocol};
Expand Down Expand Up @@ -51,21 +51,21 @@ impl Future for NeverExit {
}

struct GossipRouter {
incoming_messages: mpsc::UnboundedReceiver<(Hash, ConsensusMessage)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
messages: Vec<(Hash, ConsensusMessage)>,
incoming_messages: mpsc::UnboundedReceiver<(Hash, Vec<u8>)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
messages: Vec<(Hash, Vec<u8>)>,
}

impl GossipRouter {
fn add_message(&mut self, topic: Hash, message: ConsensusMessage) {
fn add_message(&mut self, topic: Hash, message: Vec<u8>) {
self.outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(message.clone()).is_ok()
});
self.messages.push((topic, message));
}

fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<ConsensusMessage>) {
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<Vec<u8>>) {
for message in self.messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| msg.clone())
Expand Down Expand Up @@ -105,8 +105,8 @@ impl Future for GossipRouter {

#[derive(Clone)]
struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, ConsensusMessage)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
send_message: mpsc::UnboundedSender<(Hash, Vec<u8>)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
}

fn make_gossip() -> (GossipRouter, GossipHandle) {
Expand All @@ -130,13 +130,13 @@ struct TestNetwork {
}

impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx));
rx
}

fn gossip_message(&self, topic: Hash, message: ConsensusMessage) {
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
let _ = self.gossip.send_message.unbounded_send((topic, message));
}

Expand Down Expand Up @@ -322,9 +322,14 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
gossip: gossip_handle.clone(),
});

let message_val = crate::gossip::RegisteredMessageValidator::new_test(
|_hash: &_| Some(crate::gossip::Known::Leaf),
);

TestValidationNetwork::new(
net,
NeverExit,
message_val,
runtime_api.clone(),
executor.clone(),
)
Expand Down Expand Up @@ -427,30 +432,40 @@ fn ingress_fetch_works() {
let parent_hash = [1; 32].into();

let (router_a, router_b, router_c) = {
let validators: Vec<Hash> = vec![
key_a.to_raw_public().into(),
key_b.to_raw_public().into(),
key_c.to_raw_public().into(),
];

let authorities: Vec<_> = validators.iter().cloned()
.map(|h| h.to_fixed_bytes())
.map(Ed25519AuthorityId)
.collect();

let mut api_handle = built.api_handle.lock();
*api_handle = ApiData {
active_parachains: vec![id_a, id_b, id_c],
duties: vec![Chain::Parachain(id_a), Chain::Parachain(id_b), Chain::Parachain(id_c)],
validators: vec![
key_a.to_raw_public().into(),
key_b.to_raw_public().into(),
key_c.to_raw_public().into(),
],
validators,
ingress,
};

(
built.networks[0].communication_for(
make_table(&*api_handle, &key_a, parent_hash),
vec![MessagesFrom::from_messages(id_a, messages_from_a)],
&authorities,
),
built.networks[1].communication_for(
make_table(&*api_handle, &key_b, parent_hash),
vec![MessagesFrom::from_messages(id_b, messages_from_b)],
&authorities,
),
built.networks[2].communication_for(
make_table(&*api_handle, &key_c, parent_hash),
vec![MessagesFrom::from_messages(id_c, messages_from_c)],
&authorities,
),
)
};
Expand Down
Loading