Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jwk #2: ensure jwk txns are expected in consensus #11855

Merged
merged 29 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/safety-rules/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
aptos-config = { workspace = true }
aptos-consensus-types = { workspace = true }
aptos-crypto = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion consensus/safety-rules/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod process;
mod remote_service;
mod safety_rules;
mod safety_rules_2chain;
mod safety_rules_manager;
pub mod safety_rules_manager;
mod serializer;
mod t_safety_rules;
mod thread;
Expand Down
14 changes: 14 additions & 0 deletions consensus/safety-rules/src/safety_rules_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
thread::ThreadService,
SafetyRules, TSafetyRules,
};
use anyhow::anyhow;
use aptos_config::config::{InitialSafetyRulesConfig, SafetyRulesConfig, SafetyRulesService};
use aptos_crypto::bls12381::PrivateKey;
use aptos_global_constants::CONSENSUS_KEY;
use aptos_infallible::RwLock;
use aptos_secure_storage::{KVStorage, Storage};
use std::{net::SocketAddr, sync::Arc};
Expand Down Expand Up @@ -73,6 +76,17 @@ pub fn storage(config: &SafetyRulesConfig) -> PersistentSafetyStorage {
}
}

pub fn load_consensus_key_from_secure_storage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have something for DKG?

config: &SafetyRulesConfig,
) -> anyhow::Result<PrivateKey> {
let storage: Storage = (&config.backend).into();
let storage = Box::new(storage);
let response = storage.get::<PrivateKey>(CONSENSUS_KEY).map_err(|e| {
anyhow!("load_consensus_key_from_secure_storage failed with storage read error: {e}")
})?;
Ok(response.value)
}

enum SafetyRulesWrapper {
Local(Arc<RwLock<SafetyRules>>),
Process(ProcessService),
Expand Down
9 changes: 8 additions & 1 deletion consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use aptos_logger::{debug, info};
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
epoch_state::EpochState,
on_chain_config::{DagConsensusConfigV1, ValidatorTxnConfig},
on_chain_config::{DagConsensusConfigV1, FeatureFlag, Features, ValidatorTxnConfig},
validator_signer::ValidatorSigner,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -330,6 +330,7 @@ pub struct DagBootstrapper {
quorum_store_enabled: bool,
vtxn_config: ValidatorTxnConfig,
executor: BoundedExecutor,
features: Features,
}

impl DagBootstrapper {
Expand All @@ -352,6 +353,7 @@ impl DagBootstrapper {
quorum_store_enabled: bool,
vtxn_config: ValidatorTxnConfig,
executor: BoundedExecutor,
features: Features,
) -> Self {
Self {
self_peer,
Expand All @@ -371,6 +373,7 @@ impl DagBootstrapper {
quorum_store_enabled,
vtxn_config,
executor,
features,
}
}

Expand Down Expand Up @@ -557,6 +560,7 @@ impl DagBootstrapper {
fetch_requester,
self.config.node_payload_config.clone(),
self.vtxn_config.clone(),
self.features.clone(),
);
let fetch_handler = FetchRequestHandler::new(dag_store.clone(), self.epoch_state.clone());

Expand Down Expand Up @@ -647,6 +651,8 @@ pub(super) fn bootstrap_dag_for_test(
UnboundedReceiver<OrderedBlocks>,
) {
let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let mut features = Features::default();
features.enable(FeatureFlag::RECONFIGURE_WITH_DKG);
let bootstraper = DagBootstrapper::new(
self_peer,
DagConsensusConfig::default(),
Expand All @@ -665,6 +671,7 @@ pub(super) fn bootstrap_dag_for_test(
false,
ValidatorTxnConfig::default_enabled(),
BoundedExecutor::new(2, Handle::current()),
features,
);

let (_base_state, handler, fetch_service) = bootstraper.full_bootstrap();
Expand Down
41 changes: 28 additions & 13 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::dag::{
dag_fetcher::TFetchRequester,
dag_network::RpcHandler,
dag_store::Dag,
errors::NodeBroadcastHandleError,
observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
use crate::{
dag::{
dag_fetcher::TFetchRequester,
dag_network::RpcHandler,
dag_store::Dag,
errors::NodeBroadcastHandleError,
observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
},
storage::DAGStorage,
types::{Node, NodeCertificate, Vote},
NodeId,
},
storage::DAGStorage,
types::{Node, NodeCertificate, Vote},
NodeId,
util::is_vtxn_expected,
};
use anyhow::{bail, ensure};
use aptos_config::config::DagPayloadConfig;
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::{debug, error};
use aptos_types::{
epoch_state::EpochState, on_chain_config::ValidatorTxnConfig,
validator_signer::ValidatorSigner, validator_txn::ValidatorTransaction,
epoch_state::EpochState,
on_chain_config::{Features, ValidatorTxnConfig},
validator_signer::ValidatorSigner,
validator_txn::ValidatorTransaction,
};
use async_trait::async_trait;
use std::{collections::BTreeMap, mem, sync::Arc};
Expand All @@ -35,6 +40,7 @@ pub(crate) struct NodeBroadcastHandler {
fetch_requester: Arc<dyn TFetchRequester>,
payload_config: DagPayloadConfig,
vtxn_config: ValidatorTxnConfig,
features: Features,
}

impl NodeBroadcastHandler {
Expand All @@ -46,6 +52,7 @@ impl NodeBroadcastHandler {
fetch_requester: Arc<dyn TFetchRequester>,
payload_config: DagPayloadConfig,
vtxn_config: ValidatorTxnConfig,
features: Features,
) -> Self {
let epoch = epoch_state.epoch;
let votes_by_round_peer = read_votes_from_storage(&storage, epoch);
Expand All @@ -59,6 +66,7 @@ impl NodeBroadcastHandler {
fetch_requester,
payload_config,
vtxn_config,
features,
}
}

Expand Down Expand Up @@ -87,6 +95,13 @@ impl NodeBroadcastHandler {
fn validate(&self, node: Node) -> anyhow::Result<Node> {
let num_vtxns = node.validator_txns().len() as u64;
ensure!(num_vtxns <= self.vtxn_config.per_block_limit_txn_count());
for vtxn in node.validator_txns() {
ensure!(
is_vtxn_expected(&self.features, vtxn),
"unexpected validator transaction: {:?}",
vtxn.topic()
);
}
let vtxn_total_bytes = node
.validator_txns()
.iter()
Expand Down
2 changes: 0 additions & 2 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ async fn test_dag_e2e() {
let runtime = consensus_runtime();
let mut playground = NetworkPlayground::new(runtime.handle().clone());
let (signers, validators) = random_validator_verifier(num_nodes, None, false);

let (nodes, mut ordered_node_receivers) = bootstrap_nodes(&mut playground, signers, validators);
for node in nodes {
runtime.spawn(node.start());
Expand All @@ -229,7 +228,6 @@ async fn test_dag_e2e() {
}
let first = all_ordered.first().unwrap();
assert_gt!(first.len(), 0, "must order nodes");
debug!("Nodes: {:?}", first);
for a in all_ordered.iter() {
assert_eq!(a.len(), first.len(), "length should match");
assert_eq!(a, first);
Expand Down
10 changes: 8 additions & 2 deletions consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use crate::dag::{
use aptos_config::config::DagPayloadConfig;
use aptos_infallible::RwLock;
use aptos_types::{
aggregate_signature::PartialSignatures, epoch_state::EpochState,
on_chain_config::ValidatorTxnConfig, validator_verifier::random_validator_verifier,
aggregate_signature::PartialSignatures,
epoch_state::EpochState,
on_chain_config::{Features, ValidatorTxnConfig},
validator_verifier::random_validator_verifier,
};
use claims::{assert_ok, assert_ok_eq};
use futures::executor::block_on;
Expand Down Expand Up @@ -68,6 +70,7 @@ async fn test_node_broadcast_receiver_succeed() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
Features::default(),
);

let expected_result = Vote::new(
Expand Down Expand Up @@ -114,6 +117,7 @@ async fn test_node_broadcast_receiver_failure() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
Features::default(),
)
})
.collect();
Expand Down Expand Up @@ -197,6 +201,7 @@ async fn test_node_broadcast_receiver_storage() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
Features::default(),
);
let sig = rb_receiver.process(node).await.expect("must succeed");

Expand All @@ -213,6 +218,7 @@ async fn test_node_broadcast_receiver_storage() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
Features::default(),
);
assert_ok!(rb_receiver.gc_before_round(2));
assert_eq!(storage.get_votes().unwrap().len(), 0);
Expand Down
16 changes: 15 additions & 1 deletion consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender: NetworkSender,
payload_client: Arc<dyn PayloadClient>,
payload_manager: Arc<PayloadManager>,
features: Features,
) {
let epoch = epoch_state.epoch;
info!(
Expand Down Expand Up @@ -930,6 +931,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
onchain_consensus_config,
buffered_proposal_tx,
self.config.clone(),
features,
);

round_manager.init(last_vote).await;
Expand Down Expand Up @@ -975,7 +977,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

let onchain_consensus_config: anyhow::Result<OnChainConsensusConfig> = payload.get();
let onchain_execution_config: anyhow::Result<OnChainExecutionConfig> = payload.get();
let features = payload.get::<Features>().ok().unwrap_or_default();
let features = payload.get::<Features>();

if let Err(error) = &onchain_consensus_config {
error!("Failed to read on-chain consensus config {}", error);
Expand All @@ -985,11 +987,17 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
error!("Failed to read on-chain execution config {}", error);
}

if let Err(error) = &features {
error!("Failed to read on-chain features {}", error);
}

self.epoch_state = Some(epoch_state.clone());

let consensus_config = onchain_consensus_config.unwrap_or_default();
let execution_config = onchain_execution_config
.unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing());
let features = features.unwrap_or_default();

let (network_sender, payload_client, payload_manager) = self
.initialize_shared_component(
&epoch_state,
Expand All @@ -1006,6 +1014,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender,
payload_client,
payload_manager,
features,
)
.await
} else {
Expand All @@ -1015,6 +1024,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender,
payload_client,
payload_manager,
features,
)
.await
}
Expand Down Expand Up @@ -1061,6 +1071,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender: NetworkSender,
payload_client: Arc<dyn PayloadClient>,
payload_manager: Arc<PayloadManager>,
features: Features,
) {
match self.storage.start() {
LivenessStorageData::FullRecoveryData(initial_data) => {
Expand All @@ -1072,6 +1083,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender,
payload_client,
payload_manager,
features,
)
.await
},
Expand All @@ -1095,6 +1107,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender: NetworkSender,
payload_client: Arc<dyn PayloadClient>,
payload_manager: Arc<PayloadManager>,
features: Features,
) {
let epoch = epoch_state.epoch;

Expand Down Expand Up @@ -1147,6 +1160,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
onchain_consensus_config.quorum_store_enabled(),
onchain_consensus_config.effective_validator_txn_config(),
self.bounded_executor.clone(),
features,
);

let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None);
Expand Down
Loading
Loading