Skip to content

Commit

Permalink
Use only one networking service for all chains (#1398)
Browse files Browse the repository at this point in the history
* Add a `NetworkServiceChain` type

* Use one channel per chain

* Add back a channel for the main service messages

* Allow adding a chain to an existing service

* Use the same networking service for all chains

* Add log messages for adding and removing chain

* Also clean up the peering strategy

* Handle duplicate chains by using reference counting

* CHANGELOG

* Docfix

* More docfix
  • Loading branch information
tomaka authored Nov 24, 2023
1 parent 972d48e commit 89fd7d8
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 396 deletions.
33 changes: 33 additions & 0 deletions lib/src/network/basic_peering_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,39 @@ where
}
}

/// Removes all the chain assignments for the given chain.
///
/// If a peer isn't assigned to any chain anymore and doesn't have any connected address,
/// all of its addresses are also removed from the collection.
pub fn remove_chain_peers(&mut self, chain: &TChainId) {
let Some(chain_index) = self.chains_indices.remove(chain) else {
// Chain didn't exist.
return;
};
self.chains.remove(chain_index);

let chain_peers = {
let mut in_chain_and_after_chain = self.peers_chains_by_state.split_off(&(
chain_index,
PeerChainState::Assignable,
usize::min_value(),
));
let mut after_chain = in_chain_and_after_chain.split_off(&(
chain_index + 1,
PeerChainState::Assignable,
usize::min_value(),
));
self.peers_chains_by_state.append(&mut after_chain);
in_chain_and_after_chain
};

for (_, _, peer_id_index) in chain_peers {
let _was_in = self.peers_chains.remove(&(peer_id_index, chain_index));
debug_assert!(_was_in.is_some());
self.try_clean_up_peer_id(peer_id_index);
}
}

/// Inserts a chain-peer combination to the collection, indicating that the given peer belongs
/// to the given chain.
///
Expand Down
5 changes: 4 additions & 1 deletion lib/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,10 @@ where
SubstreamId::max_value(),
),
)
.find(|(_, _, direction, _, _)| matches!(*direction, SubstreamDirection::Out))
.find(|(_, _, direction, state, _)| {
matches!(*direction, SubstreamDirection::Out)
&& matches!(*state, NotificationsSubstreamState::Open)
})
.is_some()
{
return Err(RemoveChainError::InUse);
Expand Down
5 changes: 2 additions & 3 deletions light-base/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ pub struct DatabaseContentRuntimeCodeHint {
/// The returned string is guaranteed to not exceed `max_size` bytes. A truncated or invalid
/// database is intentionally returned if `max_size` is too low to fit all the information.
pub async fn encode_database<TPlat: platform::PlatformRef>(
network_service: &network_service::NetworkService<TPlat>,
network_service_chain_id: network_service::ChainId,
network_service: &network_service::NetworkServiceChain<TPlat>,
sync_service: &sync_service::SyncService<TPlat>,
runtime_service: &runtime_service::RuntimeService<TPlat>,
genesis_block_hash: &[u8; 32],
Expand All @@ -102,7 +101,7 @@ pub async fn encode_database<TPlat: platform::PlatformRef>(
serde_json::from_str(&encoded).unwrap()
}),
nodes: network_service
.discovered_nodes(network_service_chain_id)
.discovered_nodes()
.await
.map(|(peer_id, addrs)| {
(
Expand Down
5 changes: 1 addition & 4 deletions light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ pub struct StartConfig<'a, TPlat: PlatformRef> {

/// Access to the network, and identifier of the chain from the point of view of the network
/// service.
pub network_service: (
Arc<network_service::NetworkService<TPlat>>,
network_service::ChainId,
),
pub network_service: Arc<network_service::NetworkServiceChain<TPlat>>,

/// Service responsible for synchronizing the chain.
pub sync_service: Arc<sync_service::SyncService<TPlat>>,
Expand Down
13 changes: 3 additions & 10 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ struct Background<TPlat: PlatformRef> {
system_version: String,

/// See [`StartConfig::network_service`].
network_service: (
Arc<network_service::NetworkService<TPlat>>,
network_service::ChainId,
),
network_service: Arc<network_service::NetworkServiceChain<TPlat>>,

/// See [`StartConfig::sync_service`].
sync_service: Arc<sync_service::SyncService<TPlat>>,
/// See [`StartConfig::runtime_service`].
Expand Down Expand Up @@ -680,12 +678,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
match PeerId::from_bytes(peer_id_bytes) {
Ok(peer_id) => {
self.network_service
.0
.discover(
self.network_service.1,
iter::once((peer_id, iter::once(addr))),
false,
)
.discover(iter::once((peer_id, iter::once(addr))), false)
.await;
request.respond(methods::Response::sudo_unstable_p2pDiscover(()));
}
Expand Down
3 changes: 1 addition & 2 deletions light-base/src/json_rpc_service/background/getters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
};

let response = crate::database::encode_database(
&self.network_service.0,
self.network_service.1,
&self.network_service,
&self.sync_service,
&self.runtime_service,
&self.genesis_block_hash,
Expand Down
146 changes: 69 additions & 77 deletions light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ pub struct Client<TPlat: platform::PlatformRef, TChain = ()> {
/// Because we use a `SipHasher`, this hashmap isn't created in the `new` function (as this
/// function is `const`) but lazily the first time it is needed.
chains_by_key: Option<HashMap<ChainKey, RunningChain<TPlat>, util::SipHasherBuild>>,

/// All chains share a single networking service created lazily the first time that it
/// is used.
network_service: Option<Arc<network_service::NetworkService<TPlat>>>,
}

struct PublicApiChain<TChain> {
Expand Down Expand Up @@ -278,8 +282,7 @@ struct RunningChain<TPlat: platform::PlatformRef> {
}

struct ChainServices<TPlat: platform::PlatformRef> {
network_service: Arc<network_service::NetworkService<TPlat>>,
network_service_chain_id: network_service::ChainId,
network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
transactions_service: Arc<transactions_service::TransactionsService<TPlat>>,
Expand All @@ -289,7 +292,6 @@ impl<TPlat: platform::PlatformRef> Clone for ChainServices<TPlat> {
fn clone(&self) -> Self {
ChainServices {
network_service: self.network_service.clone(),
network_service_chain_id: self.network_service_chain_id,
sync_service: self.sync_service.clone(),
runtime_service: self.runtime_service.clone(),
transactions_service: self.transactions_service.clone(),
Expand Down Expand Up @@ -355,6 +357,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
platform,
public_api_chains: slab::Slab::new(),
chains_by_key: None,
network_service: None,
}
}

Expand Down Expand Up @@ -722,6 +725,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
start_services(
log_name.clone(),
&self.platform,
&mut self.network_service,
runtime_code_hint,
genesis_block_header,
usize::from(chain_spec.block_number_bytes()),
Expand Down Expand Up @@ -861,14 +865,9 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
self.platform
.spawn_task("network-service-add-initial-topology".into(), {
let network_service = services.network_service.clone();
let network_service_chain_id = services.network_service_chain_id;
async move {
network_service
.discover(network_service_chain_id, known_nodes, false)
.await;
network_service
.discover(network_service_chain_id, bootstrap_nodes, true)
.await;
network_service.discover(known_nodes, false).await;
network_service.discover(bootstrap_nodes, true).await;
}
.boxed()
});
Expand Down Expand Up @@ -897,10 +896,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
service_starter.start(json_rpc_service::StartConfig {
platform: self.platform.clone(),
sync_service: services.sync_service.clone(),
network_service: (
services.network_service.clone(),
services.network_service_chain_id,
),
network_service: services.network_service.clone(),
transactions_service: services.transactions_service.clone(),
runtime_service: services.runtime_service.clone(),
chain_spec: &chain_spec,
Expand Down Expand Up @@ -1085,77 +1081,74 @@ enum StartServicesChainTy<'a, TPlat: platform::PlatformRef> {
fn start_services<TPlat: platform::PlatformRef>(
log_name: String,
platform: &TPlat,
network_service: &mut Option<Arc<network_service::NetworkService<TPlat>>>,
runtime_code_hint: Option<database::DatabaseContentRuntimeCodeHint>,
genesis_block_scale_encoded_header: Vec<u8>,
block_number_bytes: usize,
fork_id: Option<String>,
config: StartServicesChainTy<'_, TPlat>,
network_identify_agent_version: String,
) -> ChainServices<TPlat> {
// The network service is responsible for connecting to the peer-to-peer network.
let (network_service, network_service_chain_ids) =
let network_service = network_service.get_or_insert_with(|| {
network_service::NetworkService::new(network_service::Config {
platform: platform.clone(),
identify_agent_version: network_identify_agent_version,
connections_open_pool_size: 5,
connections_open_pool_restore_delay: Duration::from_secs(1),
chains: vec![network_service::ConfigChain {
log_name: log_name.clone(),
num_out_slots: 4,
grandpa_protocol_finalized_block_height: if let StartServicesChainTy::RelayChain {
chain_information,
} = &config
{
if matches!(
chain_information.as_ref().finality,
chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
) {
Some(chain_information.as_ref().finalized_block_header.number)
} else {
None
}
chains_capacity: 1,
})
});

let network_service_chain = network_service.add_chain(network_service::ConfigChain {
log_name: log_name.clone(),
num_out_slots: 4,
grandpa_protocol_finalized_block_height: if let StartServicesChainTy::RelayChain {
chain_information,
} = &config
{
if matches!(
chain_information.as_ref().finality,
chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
) {
Some(chain_information.as_ref().finalized_block_header.number)
} else {
None
}
} else {
// Parachains never use GrandPa.
None
},
genesis_block_hash: header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
best_block: match &config {
StartServicesChainTy::RelayChain { chain_information } => (
chain_information.as_ref().finalized_block_header.number,
chain_information
.as_ref()
.finalized_block_header
.hash(block_number_bytes),
),
StartServicesChainTy::Parachain {
finalized_block_header,
..
} => {
if let Ok(decoded) = header::decode(finalized_block_header, block_number_bytes) {
(
decoded.number,
header::hash_from_scale_encoded_header(finalized_block_header),
)
} else {
// Parachains never use GrandPa.
None
},
genesis_block_hash: header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
best_block: match &config {
StartServicesChainTy::RelayChain { chain_information } => (
chain_information.as_ref().finalized_block_header.number,
chain_information
.as_ref()
.finalized_block_header
.hash(block_number_bytes),
),
StartServicesChainTy::Parachain {
finalized_block_header,
..
} => {
if let Ok(decoded) =
header::decode(finalized_block_header, block_number_bytes)
{
(
decoded.number,
header::hash_from_scale_encoded_header(finalized_block_header),
)
} else {
(
0,
header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
)
}
}
},
fork_id,
block_number_bytes,
}],
});

let network_service_chain_id = network_service_chain_ids.into_iter().next().unwrap();
(
0,
header::hash_from_scale_encoded_header(&genesis_block_scale_encoded_header),
)
}
}
},
fork_id,
block_number_bytes,
});

let (sync_service, runtime_service) = match config {
StartServicesChainTy::Parachain {
Expand All @@ -1173,7 +1166,7 @@ fn start_services<TPlat: platform::PlatformRef>(
platform: platform.clone(),
log_name: log_name.clone(),
block_number_bytes,
network_service: (network_service.clone(), network_service_chain_id),
network_service: network_service_chain.clone(),
chain_type: sync_service::ConfigChainType::Parachain(
sync_service::ConfigParachain {
finalized_block_header,
Expand Down Expand Up @@ -1209,7 +1202,7 @@ fn start_services<TPlat: platform::PlatformRef>(
log_name: log_name.clone(),
block_number_bytes,
platform: platform.clone(),
network_service: (network_service.clone(), network_service_chain_id),
network_service: network_service_chain.clone(),
chain_type: sync_service::ConfigChainType::RelayChain(
sync_service::ConfigRelayChain {
chain_information: chain_information.clone(),
Expand Down Expand Up @@ -1249,16 +1242,15 @@ fn start_services<TPlat: platform::PlatformRef>(
platform: platform.clone(),
sync_service: sync_service.clone(),
runtime_service: runtime_service.clone(),
network_service: (network_service.clone(), network_service_chain_id),
network_service: network_service_chain.clone(),
max_pending_transactions: NonZeroU32::new(64).unwrap(),
max_concurrent_downloads: NonZeroU32::new(3).unwrap(),
max_concurrent_validations: NonZeroU32::new(2).unwrap(),
},
));

ChainServices {
network_service,
network_service_chain_id,
network_service: network_service_chain,
runtime_service,
sync_service,
transactions_service,
Expand Down
Loading

0 comments on commit 89fd7d8

Please sign in to comment.