Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rework the event system of sc-network #14197

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
cb99f5c
Take notification configs
altonen Mar 14, 2023
471616a
Return `NonDefaultSetConfig` from `TransactionsHandlerPrototype::new()`
altonen Mar 7, 2023
c456edd
Make fields of `NonDefaultSetConfig` private
altonen Mar 14, 2023
5593a53
Introduce `NotificationService`
altonen Mar 14, 2023
22cd46c
Add implementation of `NotificationService`
altonen Mar 14, 2023
b42b901
Initialize command streams and protocol handles in `Notifications`
altonen Mar 16, 2023
9685625
Poll commands from protocols in `Notifications`
altonen Mar 16, 2023
b93cf90
Start using `tracing_unbounded` for `Notifications` -> protocols channel
altonen Mar 20, 2023
8cc85c6
Send `NotificationEvent`s from `Notifications`
altonen Mar 20, 2023
f4f9090
Start using `NotificationService` for transactions protocol
altonen Mar 20, 2023
796f826
Implement `clone()` for `NotificationService`
altonen Mar 22, 2023
00a093a
Fix `BEEFY`, `NetworkGossip` and `GRANDPA` tests
altonen Apr 11, 2023
a3746dc
Start using `NotificationService` for `SyncingEngine`
altonen Apr 24, 2023
15d093b
Validate inbound substream before emitting `NotificationStreamOpened`
altonen Apr 25, 2023
513ca87
Convert statement store to use `NotificationService`
altonen May 9, 2023
2e9da9c
Split `NotificationService` files logically
altonen May 16, 2023
1fd9a08
Add getter for peer's `NotificationsSink`
altonen May 23, 2023
94e1e79
Merge remote-tracking branch 'origin/master' into notification-service
altonen May 25, 2023
b19ccb8
Apply suggestions from code review
altonen May 25, 2023
37db9f6
Get peer count from `SyncingEngine` in `sc-network` tests
altonen May 26, 2023
4e2ad89
Apply review comments
altonen May 26, 2023
3441cfa
Merge remote-tracking branch 'origin/master' into notification-service
altonen May 26, 2023
182fdc7
Apply suggestions from code review
altonen May 26, 2023
440402c
Introduce `NotificationEvent::NotificationSinkReplaced`
altonen May 26, 2023
96d2250
Refactor substream acceptance in `Notifications`
altonen May 26, 2023
fd41d7d
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jun 5, 2023
ea56743
Rename `Peerset` functions
altonen Jun 5, 2023
1097738
Add metrics
altonen Jun 6, 2023
140b830
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jun 6, 2023
711761d
Introduce `MessageSink`
altonen Jun 14, 2023
294fd49
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jul 17, 2023
b9368f8
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jul 24, 2023
258fd5a
Minor fixes
altonen Jul 24, 2023
1132424
Store peer role in `PeerStore`
altonen Jul 25, 2023
4b02a3d
Don't return `Result` for `send_sync_notification()`
altonen Jul 25, 2023
c3ff587
Do not pass Prometheus registry to `notification_service()`
altonen Jul 25, 2023
b8e2fcc
Fix metrics
altonen Aug 3, 2023
1b84a72
Merge remote-tracking branch 'origin/master' into notification-service
altonen Aug 3, 2023
aad966a
Rework peer role detection
altonen Aug 4, 2023
295a6a7
Remove rejected peers from `ProtocolController`
altonen Aug 4, 2023
fe8eeed
Fix BEEFY test
altonen Aug 7, 2023
ccb3beb
Remove dead code
altonen Aug 7, 2023
d38b53b
Start using `NotificationService` properly in `SyncingEngine`
altonen Aug 8, 2023
a84000f
Minor code cleanups
altonen Aug 9, 2023
92da646
Fix warnings
altonen Aug 9, 2023
18d948e
Fix documentation
altonen Aug 10, 2023
b7c3b68
Apply review comments
altonen Aug 10, 2023
10bdc28
Fix documentation
altonen Aug 10, 2023
e5e6af0
Apply suggestions from code review
altonen Aug 11, 2023
25c12b2
Apply review comments
altonen Aug 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 24 additions & 4 deletions client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,9 +1022,7 @@ async fn should_initialize_voter_at_genesis() {
assert_eq!(state, persisted_state);
}

// TODO(aaro): fix
#[tokio::test]
#[ignore]
async fn should_initialize_voter_at_custom_genesis() {
let keys = &[BeefyKeyring::Alice];
let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
Expand All @@ -1043,7 +1041,25 @@ async fn should_initialize_voter_at_custom_genesis() {
net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
//
// NOTE: code from `voter_init_setup()` is moved here because the new network event system
// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
// first `GossipEngine`
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let (gossip_validator, _) = GossipValidator::new(known_peers);
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
"/beefy/whatever",
gossip_validator,
None,
);
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1).unwrap();

// Test initialization at session boundary.
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
Expand Down Expand Up @@ -1073,7 +1089,11 @@ async fn should_initialize_voter_at_custom_genesis() {

net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
// load persistent state - state preset in DB, but with different pallet genesis
let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let new_persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1).unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
Expand Down
31 changes: 18 additions & 13 deletions client/consensus/grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ use super::{
};
use crate::{communication::grandpa_protocol_name, environment::SharedVoterSetState};
use futures::prelude::*;
use parity_scale_codec::Encode;
use parity_scale_codec::{DecodeAll, Encode};
use sc_network::{
config::{MultiaddrWithPeerId, Role},
event::Event as NetworkEvent,
service::traits::{MessageSink, NotificationEvent, NotificationService},
service::traits::{Direction, MessageSink, NotificationEvent, NotificationService},
types::ProtocolName,
Multiaddr, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NetworkSyncForkRequest, NotificationSenderError, NotificationSenderT as NotificationSender,
NotificationsSink, PeerId, ReputationChange,
PeerId, ReputationChange,
};
use sc_network_common::{
role::ObservedRole,
role::{ObservedRole, Roles},
sync::{SyncEvent as SyncStreamEvent, SyncEventStream},
};
use sc_network_gossip::Validator;
Expand Down Expand Up @@ -126,6 +126,12 @@ impl NetworkPeers for TestNetwork {
fn sync_num_connected(&self) -> usize {
unimplemented!();
}

fn peer_role(&self, _peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
Roles::decode_all(&mut &handshake[..])
.ok()
.and_then(|role| Some(ObservedRole::from(role)))
}
}

impl NetworkEventStream for TestNetwork {
Expand Down Expand Up @@ -233,7 +239,6 @@ impl NotificationService for TestNotificationService {

/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, _peer: &PeerId, _notification: Vec<u8>) {
// TODO: this needs to be implemented
unimplemented!();
}

Expand All @@ -247,7 +252,7 @@ impl NotificationService for TestNotificationService {
}

/// Set handshake for the notification protocol replacing the old handshake.
async fn set_hanshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!();
}

Expand All @@ -264,7 +269,7 @@ impl NotificationService for TestNotificationService {
unimplemented!();
}

fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>> {
fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
unimplemented!();
}
}
Expand Down Expand Up @@ -466,9 +471,9 @@ fn good_commit_leads_to_relay() {
let _ = tester.notification_tx.unbounded_send(
NotificationEvent::NotificationStreamOpened {
peer: sender_id,
role: ObservedRole::Full,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: vec![1, 3, 3, 7],
handshake: Roles::FULL.encode(),
},
);
let _ = tester.notification_tx.unbounded_send(
Expand All @@ -483,9 +488,9 @@ fn good_commit_leads_to_relay() {
let _ = tester.notification_tx.unbounded_send(
NotificationEvent::NotificationStreamOpened {
peer: receiver_id,
role: ObservedRole::Full,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: vec![1, 3, 3, 7],
handshake: Roles::FULL.encode(),
},
);

Expand Down Expand Up @@ -617,9 +622,9 @@ fn bad_commit_leads_to_report() {
let _ = tester.notification_tx.unbounded_send(
NotificationEvent::NotificationStreamOpened {
peer: sender_id,
role: ObservedRole::Full,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: vec![1, 3, 3, 7],
handshake: Roles::FULL.encode(),
},
);
let _ = tester.notification_tx.unbounded_send(
Expand Down
1 change: 1 addition & 0 deletions client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ async-trait = "0.1.57"
tokio = "1.22.0"
quickcheck = { version = "1.0.3", default-features = false }
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] }
52 changes: 24 additions & 28 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,17 @@ impl<B: BlockT> Future for GossipEngine<B> {

match next_notification {
altonen marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Some(event)) => match event {
NotificationEvent::ValidateInboundSubstream {
peer,
handshake,
result_tx,
} => {
log::debug!(
target: "gossip",
"accepting inbound substream from {peer}, handshake {handshake:?}"
);
NotificationEvent::ValidateInboundSubstream { result_tx, .. } => {
let _ = result_tx.send(ValidationResult::Accept);
},
NotificationEvent::NotificationStreamOpened { peer, role, .. } => {
NotificationEvent::NotificationStreamOpened {
peer, handshake, ..
} => {
let Some(role) = this.network.peer_role(peer, handshake) else {
log::debug!(target: "gossip", "role for {peer} couldn't be determined");
continue
};

this.state_machine.new_peer(&mut *this.network, peer, role);
},
NotificationEvent::NotificationStreamClosed { peer } => {
Expand Down Expand Up @@ -326,6 +325,7 @@ impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
mod tests {
use super::*;
use crate::{ValidationResult, ValidatorContext};
use codec::{DecodeAll, Encode};
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
executor::{block_on, block_on_stream},
Expand All @@ -335,10 +335,10 @@ mod tests {
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sc_network::{
config::MultiaddrWithPeerId,
service::traits::{MessageSink, NotificationEvent},
service::traits::{Direction, MessageSink, NotificationEvent},
Event, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSenderError, NotificationSenderT as NotificationSender, NotificationService,
NotificationsSink,
Roles,
};
use sc_network_common::{role::ObservedRole, sync::SyncEventStream};
use sp_runtime::{
Expand Down Expand Up @@ -419,6 +419,12 @@ mod tests {
fn sync_num_connected(&self) -> usize {
unimplemented!();
}

fn peer_role(&self, _peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
Roles::decode_all(&mut &handshake[..])
.ok()
.and_then(|role| Some(ObservedRole::from(role)))
}
}

impl NetworkEventStream for TestNetwork {
Expand Down Expand Up @@ -500,28 +506,20 @@ mod tests {
rx: UnboundedReceiver<NotificationEvent>,
}

// TODO: provide implementation
#[async_trait::async_trait]
impl sc_network::service::traits::NotificationService for TestNotificationService {
/// Instruct `Notifications` to open a new substream for `peer`.
///
/// `dial_if_disconnected` informs `Notifications` whether to dial
// the peer if there is currently no active connection to it.
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!();
}

/// Instruct `Notifications` to close substream for `peer`.
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!();
}

/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, _peer: &PeerId, _notification: Vec<u8>) {
unimplemented!();
}

/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
async fn send_async_notification(
&self,
_peer: &PeerId,
Expand All @@ -530,12 +528,10 @@ mod tests {
unimplemented!();
}

/// Set handshake for the notification protocol replacing the old handshake.
async fn set_hanshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!();
}

/// Get next event from the `Notifications` event stream.
async fn next_event(&mut self) -> Option<NotificationEvent> {
self.rx.next().await
}
Expand All @@ -548,7 +544,7 @@ mod tests {
unimplemented!();
}

fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>> {
fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
unimplemented!();
}
}
Expand Down Expand Up @@ -620,9 +616,9 @@ mod tests {
// Register the remote peer.
tx.send(NotificationEvent::NotificationStreamOpened {
peer: remote_peer,
role: ObservedRole::Authority,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: vec![1, 3, 3, 7],
handshake: Roles::FULL.encode(),
})
.await
.unwrap();
Expand Down Expand Up @@ -783,9 +779,9 @@ mod tests {
// Register the remote peer.
tx.start_send(NotificationEvent::NotificationStreamOpened {
peer: remote_peer,
role: ObservedRole::Authority,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: vec![1, 3, 3, 7],
handshake: Roles::FULL.encode(),
})
.unwrap();

Expand Down
4 changes: 4 additions & 0 deletions client/network-gossip/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ mod tests {
fn sync_num_connected(&self) -> usize {
unimplemented!();
}

fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
None
}
}

impl NetworkEventStream for NoOpNetwork {
Expand Down
12 changes: 12 additions & 0 deletions client/network/common/src/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ impl ObservedRole {
}
}

impl From<Roles> for ObservedRole {
fn from(roles: Roles) -> Self {
if roles.is_authority() {
ObservedRole::Authority
} else if roles.is_full() {
ObservedRole::Full
} else {
ObservedRole::Light
}
}
}

/// Role of the local node.
#[derive(Debug, Clone)]
pub enum Role {
Expand Down
6 changes: 1 addition & 5 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub use sc_network_common::{
sync::{warp::WarpSyncProvider, SyncMode},
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
Expand Down Expand Up @@ -541,7 +540,7 @@ impl NonDefaultSetConfig {
}

/// Take `ProtocolHandlePair` from `NonDefaultSetConfig`
pub(crate) fn take_protocol_handle(self) -> ProtocolHandlePair {
pub fn take_protocol_handle(self) -> ProtocolHandlePair {
self.protocol_handle_pair
}

Expand Down Expand Up @@ -746,9 +745,6 @@ pub struct Params<Block: BlockT> {

/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,
}

/// Full network configuration.
Expand Down
15 changes: 8 additions & 7 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub use event::{DhtEvent, Event, SyncEvent};
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{Config, IfDisconnected, RequestFailure};
pub use sc_network_common::{
role::ObservedRole,
role::{ObservedRole, Roles},
sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState, SyncStatusProvider,
Expand All @@ -277,13 +277,14 @@ pub use sc_network_common::{
pub use service::{
signature::Signature,
traits::{
KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkEventStream, NetworkNotification,
NetworkPeers, NetworkRequest, NetworkSigner, NetworkStateInfo, NetworkStatus,
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady, NotificationService,
KademliaKey, MessageSink, NetworkBlock, NetworkDHTProvider, NetworkEventStream,
NetworkNotification, NetworkPeers, NetworkRequest, NetworkSigner, NetworkStateInfo,
NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest,
NotificationSender as NotificationSenderT, NotificationSenderError,
NotificationSenderReady, NotificationService,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationCommand, NotificationSender,
NotificationsSink, OutboundFailure, ProtocolHandle, PublicKey,
};
pub use types::ProtocolName;

Expand Down
2 changes: 1 addition & 1 deletion client/network/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl PeerStoreProvider for MockPeerStore {
None
}

fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
fn set_peer_role(&mut self, _peer_id: &PeerId, _role: ObservedRole) {
unimplemented!();
}

Expand Down
Loading