Skip to content

Commit

Permalink
fix(validator-node): only submit checkpoint if the leader (#4294)
Browse files Browse the repository at this point in the history
Description
---
- changes consensus manager to only submit checkpoints if the leader
- emojifies some stdout logs
- minor fixes to log4rs sample

Motivation and Context
---
Only one checkpoint should be submitted, so to make things simpler we use the leader. 

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Jul 20, 2022
1 parent a897278 commit fd55107
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 20 deletions.
18 changes: 12 additions & 6 deletions applications/tari_validator_node/log4rs_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ appenders:
pattern: "{d(%H:%M)} {h({l}):5} {m}{n}"
filters:
- kind: threshold
level: warn
level: info

# An appender named "network" that writes to a file with a custom pattern encoder
network:
Expand Down Expand Up @@ -102,24 +102,30 @@ loggers:
tari::application:
level: info
appenders:
- base_layer
- network
- dan_layer
- other
- stdout
additive: false

tari::validator_node:
level: info
appenders:
- dan_layer
- base_layer
- network
- other
- stdout
additive: false

tari::dan:
level: info
appenders:
- dan_layer
- stdout
additive: false

tari::dan_layer:
level: info
appenders:
- dan_layer
- stdout
additive: false

# Route log events sent to the "core" logger to the "base_layer" appender
Expand Down
33 changes: 25 additions & 8 deletions applications/tari_validator_node/src/contract_worker_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ impl ContractWorkerManager {

info!(
target: LOG_TARGET,
"constitution_auto_accept is {}", self.config.constitution_auto_accept
"ℹ️ constitution_auto_accept is {}", self.config.constitution_auto_accept
);

if !self.config.scan_for_assets {
info!(
target: LOG_TARGET,
"scan_for_assets set to false. Contract scanner is shutting down."
"⚠️ scan_for_assets turned OFF. Contract scanner is shutting down."
);
self.shutdown.await;
return Ok(());
Expand All @@ -160,8 +160,21 @@ impl ContractWorkerManager {
let tip = self.base_node_client.get_tip_info().await?;
let new_contracts = self.scan_for_new_contracts(&tip).await?;

if self.config.constitution_auto_accept {
self.accept_contracts(new_contracts).await?;
if !new_contracts.is_empty() {
if self.config.constitution_auto_accept {
info!(
target: LOG_TARGET,
"ℹ️ Auto accepting {} new contract(s).",
new_contracts.len()
);
self.accept_contracts(new_contracts).await?;
} else {
info!(
target: LOG_TARGET,
"ℹ️ Auto accept is OFF. {} new contract(s) will require manual acceptance.",
new_contracts.len()
);
}
}

self.validate_contract_activity(&tip).await?;
Expand Down Expand Up @@ -212,12 +225,15 @@ impl ContractWorkerManager {
// Abandoned contracts can be revived by the VNC so they should continue to monitor them
let mut active_contracts = self.global_db.get_contracts_with_state(ContractState::Active)?;
active_contracts.append(&mut self.global_db.get_contracts_with_state(ContractState::Abandoned)?);
info!(
target: LOG_TARGET,
"ℹ️ ready to work on {} active contract(s)",
active_contracts.len()
);

for contract in active_contracts {
let contract_id = FixedHash::try_from(contract.contract_id)?;

info!(target: LOG_TARGET, "Starting contract {}", contract_id.to_hex());

let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| {
WorkerManagerError::DataCorruption {
details: error.to_string(),
Expand All @@ -235,7 +251,7 @@ impl ContractWorkerManager {
for contract in new_contracts {
info!(
target: LOG_TARGET,
"Posting acceptance transaction for contract {}", contract.contract_id
"ℹ️ Posting acceptance transaction for contract {}", contract.contract_id
);
self.post_contract_acceptance(&contract).await?;

Expand Down Expand Up @@ -317,7 +333,7 @@ impl ContractWorkerManager {
) -> Result<Vec<ActiveContract>, WorkerManagerError> {
info!(
target: LOG_TARGET,
"Scanning base layer (tip: {}) for new assets", tip.height_of_longest_chain
"🔍 Scanning base layer (tip: {}) for new assets", tip.height_of_longest_chain
);

let outputs = self
Expand Down Expand Up @@ -409,6 +425,7 @@ impl ContractWorkerManager {
}

fn spawn_asset_worker(&self, contract_id: FixedHash, constitution: &ContractConstitution) -> Arc<AtomicBool> {
info!(target: LOG_TARGET, "🚀 starting work on contract {}", contract_id);
let node_identity = self.identity.clone();
let mempool = self.mempool.clone();
let handles = self.handles.clone();
Expand Down
11 changes: 7 additions & 4 deletions dan_layer/core/src/workers/consensus_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
.unwrap_or_else(|| ViewId(0));
info!(
target: LOG_TARGET,
"Consensus worker started for asset '{}'. Tip: {}", self.asset_definition.contract_id, self.current_view_id
"🚀 Consensus worker started for asset '{}'. Tip: {}",
self.asset_definition.contract_id,
self.current_view_id
);
let starting_view = self.current_view_id;
while !stop.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -312,10 +314,11 @@ impl<'a, T: ServiceSpecification<Addr = PublicKey>> ConsensusWorkerProcessor<'a,
self.worker.asset_definition.contract_id,
self.worker.committee_manager.current_committee()?.clone(),
);
let current_view = self.worker.get_current_view()?;
let res = state
.next_event(
self.worker.timeout,
&self.worker.get_current_view()?,
&current_view,
&mut self.worker.inbound_connections,
&mut self.worker.outbound_service,
unit_of_work.clone(),
Expand All @@ -326,9 +329,9 @@ impl<'a, T: ServiceSpecification<Addr = PublicKey>> ConsensusWorkerProcessor<'a,
unit_of_work.commit()?;
if let Some(mut state_tx) = self.worker.state_db_unit_of_work.take() {
state_tx.commit()?;
let signatures = state.collected_checkpoint_signatures();
// TODO: Read checkpoint interval from constitution
if self.worker.current_view_id.as_u64() % 50 == 0 {
if current_view.is_leader() && current_view.view_id().as_u64() % 50 == 0 {
let signatures = state.collected_checkpoint_signatures();
let checkpoint_number = self.chain_db.get_current_checkpoint_number()?;
self.worker
.checkpoint_manager
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/core/src/workers/states/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl<TSpecification: ServiceSpecification> Prepare<TSpecification> {

// TODO: This might need to be checked in the QC rather
if self.received_new_view_messages.contains_key(sender) {
println!("Already received message from {:?}", sender);
warn!("Already received message from {:?}", sender);
return Ok(None);
}

Expand Down Expand Up @@ -237,7 +237,7 @@ impl<TSpecification: ServiceSpecification> Prepare<TSpecification> {
unimplemented!("Empty message");
}
if from != view_leader {
println!("Message not from leader");
warn!("Message not from leader");
return Ok(None);
}
let node = message.node().unwrap();
Expand Down

0 comments on commit fd55107

Please sign in to comment.