Skip to content

Commit

Permalink
Make communication_for exit when we end a round (paritytech#313)
Browse files Browse the repository at this point in the history
* Make `communication_for` exit when we end a round

* Fix compilation
  • Loading branch information
bkchr authored Jul 5, 2019
1 parent d19d5b1 commit a5b223a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
3 changes: 2 additions & 1 deletion network/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
&self,
table: Arc<SharedTable>,
authorities: &[ValidatorId],
exit: exit_future::Exit,
) -> Self::BuildTableRouter {
let parent_hash = *table.consensus_parent_hash();
let local_session_key = table.session_key();
Expand All @@ -354,7 +355,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
executor.spawn(work);
executor.spawn(work.select(exit).map(|_| ()).map_err(|_| ()));

table_router
});
Expand Down
10 changes: 5 additions & 5 deletions runtime/src/parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use parity_codec::{Decode, HasCompact};
use srml_support::{decl_storage, decl_module, fail, ensure};

use bitvec::{bitvec, BigEndian};
use sr_primitives::traits::{
Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero,
};
use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One};
use primitives::{Hash, Balance, parachain::{
self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots,
Expand Down Expand Up @@ -243,9 +241,11 @@ decl_storage! {
config(parachains): Vec<(ParaId, Vec<u8>, Vec<u8>)>;
config(_phdata): PhantomData<T>;
build(|storage: &mut StorageOverlay, _: &mut ChildrenStorageOverlay, config: &GenesisConfig<T>| {
use sr_primitives::traits::Zero;

let mut p = config.parachains.clone();
p.sort_unstable_by_key(|&(ref id, _, _)| id.clone());
p.dedup_by_key(|&mut (ref id, _, _)| id.clone());
p.sort_unstable_by_key(|&(ref id, _, _)| *id);
p.dedup_by_key(|&mut (ref id, _, _)| *id);

let only_ids: Vec<_> = p.iter().map(|&(ref id, _, _)| id).cloned().collect();

Expand Down
39 changes: 20 additions & 19 deletions validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub trait Network {
&self,
table: Arc<SharedTable>,
authorities: &[SessionKey],
exit: exit_future::Exit,
) -> Self::BuildTableRouter;
}

Expand Down Expand Up @@ -313,11 +314,14 @@ impl<C, N, P> ParachainValidation<C, N, P> where
let (group_info, local_duty) = make_group_info(
duty_roster,
&authorities,
sign_with.public().into(),
sign_with.public(),
)?;

info!("Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
parent_hash, local_duty.validation);
info!(
"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
parent_hash,
local_duty.validation,
);

let active_parachains = self.client.runtime_api().active_parachains(&id)?;

Expand All @@ -331,25 +335,23 @@ impl<C, N, P> ParachainValidation<C, N, P> where
self.extrinsic_store.clone(),
max_block_data_size,
));

let (_drop_signal, exit) = exit_future::signal();

let router = self.network.communication_for(
table.clone(),
&authorities,
exit.clone(),
);

let drop_signal = match local_duty.validation {
Chain::Parachain(id) => Some(self.launch_work(
parent_hash,
id,
router,
max_block_data_size,
)),
Chain::Relay => None,
};
if let Chain::Parachain(id) = local_duty.validation {
self.launch_work(parent_hash, id, router, max_block_data_size, exit);
}

let tracker = Arc::new(AttestationTracker {
table,
started: Instant::now(),
_drop_signal: drop_signal
_drop_signal,
});

live_instances.insert(parent_hash, tracker.clone());
Expand All @@ -369,10 +371,10 @@ impl<C, N, P> ParachainValidation<C, N, P> where
validation_para: ParaId,
build_router: N::BuildTableRouter,
max_block_data_size: Option<u64>,
) -> exit_future::Signal {
exit: exit_future::Exit,
) {
use extrinsic_store::Data;

let (signal, exit) = exit_future::signal();
let (collators, client) = (self.collators.clone(), self.client.clone());
let extrinsic_store = self.extrinsic_store.clone();

Expand Down Expand Up @@ -428,16 +430,15 @@ impl<C, N, P> ParachainValidation<C, N, P> where
.then(|_| Ok(()));

// spawn onto thread pool.
if let Err(_) = self.handle.execute(Box::new(cancellable_work)) {
if self.handle.execute(Box::new(cancellable_work)).is_err() {
error!("Failed to spawn cancellable work task");
}
signal
}
}

/// Parachain validation for a single block.
struct AttestationTracker {
_drop_signal: Option<exit_future::Signal>,
_drop_signal: exit_future::Signal,
table: Arc<SharedTable>,
started: Instant,
}
Expand Down Expand Up @@ -544,7 +545,7 @@ impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N,
parent_id,
parent_number: parent_header.number,
transaction_pool: self.transaction_pool.clone(),
slot_duration: self.aura_slot_duration.clone(),
slot_duration: self.aura_slot_duration,
})
}
}
Expand Down

0 comments on commit a5b223a

Please sign in to comment.