diff --git a/Cargo.lock b/Cargo.lock index 02613f7484d9d..d1bdd32c4d426 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2496,15 +2496,37 @@ dependencies = [ name = "aptos-jwk-consensus" version = "0.1.0" dependencies = [ + "anyhow", + "aptos-bitvec", + "aptos-bounded-executor", + "aptos-channels", "aptos-config", + "aptos-consensus-types", + "aptos-crypto", + "aptos-enum-conversion-derive", "aptos-event-notifications", + "aptos-global-constants", + "aptos-infallible", + "aptos-logger", + "aptos-metrics-core", "aptos-network", + "aptos-reliable-broadcast", "aptos-runtimes", + "aptos-time-service", "aptos-types", "aptos-validator-transaction-pool", + "async-trait", + "bytes", + "futures", + "futures-channel", "futures-util", + "move-core-types", + "once_cell", + "reqwest", "serde", + "serde_json", "tokio", + "tokio-retry", ] [[package]] diff --git a/aptos-node/src/network.rs b/aptos-node/src/network.rs index 4167e88ffe09f..9c95ec7a30d0c 100644 --- a/aptos-node/src/network.rs +++ b/aptos-node/src/network.rs @@ -10,7 +10,7 @@ use aptos_config::{ use aptos_consensus::network_interface::ConsensusMsg; use aptos_dkg_runtime::DKGMessage; use aptos_event_notifications::EventSubscriptionService; -use aptos_jwk_consensus::JWKConsensusMsg; +use aptos_jwk_consensus::types::JWKConsensusMsg; use aptos_logger::debug; use aptos_mempool::network::MempoolSyncMsg; use aptos_network::{ diff --git a/crates/aptos-jwk-consensus/Cargo.toml b/crates/aptos-jwk-consensus/Cargo.toml index da4d6a204ff20..f77612bbcddfa 100644 --- a/crates/aptos-jwk-consensus/Cargo.toml +++ b/crates/aptos-jwk-consensus/Cargo.toml @@ -13,15 +13,40 @@ repository = { workspace = true } rust-version = { workspace = true } [dependencies] +anyhow = { workspace = true } +aptos-bitvec = { workspace = true } +aptos-bounded-executor = { workspace = true } +aptos-channels = { workspace = true } aptos-config = { workspace = true } +aptos-consensus-types = { workspace = true } +aptos-crypto = { workspace = true } +aptos-enum-conversion-derive = { workspace = true } aptos-event-notifications = { workspace = true } +aptos-global-constants = { workspace = true } +aptos-infallible = { workspace = true } +aptos-logger = { workspace = true } +aptos-metrics-core = { workspace = true } aptos-network = { workspace = true } +aptos-reliable-broadcast = { workspace = true } aptos-runtimes = { workspace = true } +aptos-time-service = { workspace = true } aptos-types = { workspace = true } aptos-validator-transaction-pool = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +futures-channel = { workspace = true } futures-util = { workspace = true } +move-core-types = { workspace = true } +once_cell = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } +tokio-retry = { workspace = true } +[dev-dependencies] +aptos-types = { workspace = true, features = ["fuzzing"] } +aptos-validator-transaction-pool = { workspace = true, features = ["fuzzing"] } [features] smoke-test = [] diff --git a/crates/aptos-jwk-consensus/src/lib.rs b/crates/aptos-jwk-consensus/src/lib.rs index 1b873aeeba873..737b1a1f8a1ca 100644 --- a/crates/aptos-jwk-consensus/src/lib.rs +++ b/crates/aptos-jwk-consensus/src/lib.rs @@ -1,18 +1,12 @@ // Copyright © Aptos Foundation -use aptos_config::network_id::{NetworkId, PeerNetworkId}; +use crate::types::JWKConsensusMsg; use aptos_event_notifications::{ DbBackedOnChainConfig, EventNotificationListener, ReconfigNotificationListener, }; -use aptos_network::application::{ - error::Error, - interface::{NetworkClient, NetworkClientInterface, NetworkServiceEvents}, -}; -use aptos_types::PeerId; +use aptos_network::application::interface::{NetworkClient, NetworkServiceEvents}; use aptos_validator_transaction_pool::VTxnPoolState; use futures_util::StreamExt; -use serde::{Deserialize, Serialize}; -use std::time::Duration; use tokio::runtime::Runtime; #[allow(clippy::let_and_return)] @@ -35,30 +29,6 @@ pub fn start_jwk_consensus_runtime( runtime } -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct JWKConsensusMsg {} - -#[derive(Clone)] -pub struct JWKNetworkClient { - network_client: NetworkClient, -} - -impl> JWKNetworkClient { - pub fn new(network_client: NetworkClient) -> Self { - Self { network_client } - } - - pub async fn send_rpc( - &self, - peer: PeerId, - message: JWKConsensusMsg, - rpc_timeout: Duration, - ) -> Result { - let peer_network_id = PeerNetworkId::new(NetworkId::Validator, peer); - self.network_client - .send_to_peer_rpc(message, rpc_timeout, peer_network_id) - .await - } -} - +pub mod network; pub mod network_interface; +pub mod types; diff --git a/crates/aptos-jwk-consensus/src/network.rs b/crates/aptos-jwk-consensus/src/network.rs new file mode 100644 index 0000000000000..ffea47f1f2114 --- /dev/null +++ b/crates/aptos-jwk-consensus/src/network.rs @@ -0,0 +1,184 @@ +// Copyright © Aptos Foundation + +use crate::{ + network_interface::{JWKConsensusNetworkClient, RPC}, + types::JWKConsensusMsg, +}; +use anyhow::bail; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_config::network_id::NetworkId; +use aptos_consensus_types::common::Author; +#[cfg(test)] +use aptos_infallible::RwLock; +use aptos_logger::warn; +use aptos_network::{ + application::interface::{NetworkClient, NetworkServiceEvents}, + protocols::network::{Event, RpcError}, + ProtocolId, +}; +use aptos_reliable_broadcast::RBNetworkSender; +use aptos_types::account_address::AccountAddress; +use bytes::Bytes; +use futures::Stream; +use futures_channel::oneshot; +use futures_util::{ + stream::{select, select_all, StreamExt}, + SinkExt, +}; +#[cfg(test)] +use std::sync::Arc; +use std::time::Duration; +use tokio::time::timeout; + +pub struct IncomingRpcRequest { + pub msg: JWKConsensusMsg, + pub sender: AccountAddress, + pub response_sender: Box, +} + +pub struct NetworkSender { + author: AccountAddress, + jwk_network_client: JWKConsensusNetworkClient>, + self_sender: aptos_channels::Sender>, +} + +impl NetworkSender { + pub fn new( + author: AccountAddress, + jwk_network_client: JWKConsensusNetworkClient>, + self_sender: aptos_channels::Sender>, + ) -> Self { + Self { + author, + jwk_network_client, + self_sender, + } + } +} + +#[async_trait::async_trait] +impl RBNetworkSender for NetworkSender { + async fn send_rb_rpc( + &self, + receiver: Author, + msg: JWKConsensusMsg, + time_limit: Duration, + ) -> anyhow::Result { + if receiver == self.author { + let (tx, rx) = oneshot::channel(); + let self_msg = Event::RpcRequest(receiver, msg, RPC[0], tx); + self.self_sender.clone().send(self_msg).await?; + if let Ok(Ok(Ok(bytes))) = timeout(time_limit, rx).await { + Ok(RPC[0].from_bytes(&bytes)?) + } else { + bail!("self rpc failed"); + } + } else { + let result = self + .jwk_network_client + .send_rpc(receiver, msg, time_limit) + .await?; + Ok(result) + } + } +} + +pub trait RpcResponseSender: Send + Sync { + fn send(&mut self, response: anyhow::Result); +} + +pub struct RealRpcResponseSender { + pub inner: Option>>, + pub protocol: ProtocolId, +} + +impl RpcResponseSender for RealRpcResponseSender { + fn send(&mut self, response: anyhow::Result) { + let rpc_response = response + .and_then(|msg| self.protocol.to_bytes(&msg).map(Bytes::from)) + .map_err(RpcError::ApplicationError); + if let Some(tx) = self.inner.take() { + let _ = tx.send(rpc_response); + } + } +} + +#[cfg(test)] +pub struct DummyRpcResponseSender { + pub rpc_response_collector: Arc>>>, +} + +#[cfg(test)] +impl DummyRpcResponseSender { + pub fn new(rpc_response_collector: Arc>>>) -> Self { + Self { + rpc_response_collector, + } + } +} + +#[cfg(test)] +impl RpcResponseSender for DummyRpcResponseSender { + fn send(&mut self, response: anyhow::Result) { + self.rpc_response_collector.write().push(response); + } +} + +pub struct NetworkReceivers { + pub rpc_rx: aptos_channel::Receiver, +} + +pub struct NetworkTask { + all_events: Box> + Send + Unpin>, + rpc_tx: aptos_channel::Sender, +} + +impl NetworkTask { + /// Establishes the initial connections with the peers and returns the receivers. + pub fn new( + network_service_events: NetworkServiceEvents, + self_receiver: aptos_channels::Receiver>, + ) -> (NetworkTask, NetworkReceivers) { + let (rpc_tx, rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None); + + let network_and_events = network_service_events.into_network_and_events(); + if (network_and_events.values().len() != 1) + || !network_and_events.contains_key(&NetworkId::Validator) + { + panic!("The network has not been setup correctly for JWK consensus!"); + } + + // Collect all the network events into a single stream + let network_events: Vec<_> = network_and_events.into_values().collect(); + let network_events = select_all(network_events).fuse(); + let all_events = Box::new(select(network_events, self_receiver)); + + (NetworkTask { rpc_tx, all_events }, NetworkReceivers { + rpc_rx, + }) + } + + pub async fn start(mut self) { + while let Some(message) = self.all_events.next().await { + match message { + Event::RpcRequest(peer_id, msg, protocol, response_sender) => { + let req = IncomingRpcRequest { + msg, + sender: peer_id, + response_sender: Box::new(RealRpcResponseSender { + inner: Some(response_sender), + protocol, + }), + }; + + if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req)) { + warn!(error = ?e, "aptos channel closed"); + }; + }, + _ => { + // Ignore + }, + } + } + } +} diff --git a/crates/aptos-jwk-consensus/src/network_interface.rs b/crates/aptos-jwk-consensus/src/network_interface.rs index 3e29a1bec328c..5fe9ecf56f8df 100644 --- a/crates/aptos-jwk-consensus/src/network_interface.rs +++ b/crates/aptos-jwk-consensus/src/network_interface.rs @@ -1,6 +1,13 @@ // Copyright © Aptos Foundation -use aptos_network::ProtocolId; +use crate::types::JWKConsensusMsg; +use aptos_config::network_id::{NetworkId, PeerNetworkId}; +use aptos_network::{ + application::{error::Error, interface::NetworkClientInterface}, + ProtocolId, +}; +use move_core_types::account_address::AccountAddress as PeerId; +use std::time::Duration; /// Supported protocols in preferred order (from highest priority to lowest). pub const DIRECT_SEND: &[ProtocolId] = &[ @@ -15,3 +22,28 @@ pub const RPC: &[ProtocolId] = &[ ProtocolId::JWKConsensusRpcBcs, ProtocolId::JWKConsensusRpcJson, ]; + +#[derive(Clone)] +pub struct JWKConsensusNetworkClient { + network_client: NetworkClient, +} + +impl> + JWKConsensusNetworkClient +{ + pub fn new(network_client: NetworkClient) -> Self { + Self { network_client } + } + + pub async fn send_rpc( + &self, + peer: PeerId, + message: JWKConsensusMsg, + rpc_timeout: Duration, + ) -> Result { + let peer_network_id = PeerNetworkId::new(NetworkId::Validator, peer); + self.network_client + .send_to_peer_rpc(message, rpc_timeout, peer_network_id) + .await + } +} diff --git a/crates/aptos-jwk-consensus/src/types.rs b/crates/aptos-jwk-consensus/src/types.rs new file mode 100644 index 0000000000000..31223ec4c746f --- /dev/null +++ b/crates/aptos-jwk-consensus/src/types.rs @@ -0,0 +1,53 @@ +// Copyright © Aptos Foundation + +use aptos_crypto::bls12381::Signature; +use aptos_enum_conversion_derive::EnumConversion; +use aptos_reliable_broadcast::RBMessage; +use aptos_types::{ + account_address::AccountAddress, + jwks::{Issuer, ProviderJWKs}, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, EnumConversion, Deserialize, Serialize, PartialEq)] +pub enum JWKConsensusMsg { + ObservationRequest(ObservedUpdateRequest), + ObservationResponse(ObservedUpdateResponse), +} + +impl JWKConsensusMsg { + pub fn name(&self) -> &str { + match self { + JWKConsensusMsg::ObservationRequest(_) => "ObservationRequest", + JWKConsensusMsg::ObservationResponse(_) => "ObservationResponse", + } + } + + pub fn epoch(&self) -> u64 { + match self { + JWKConsensusMsg::ObservationRequest(request) => request.epoch, + JWKConsensusMsg::ObservationResponse(response) => response.epoch, + } + } +} + +impl RBMessage for JWKConsensusMsg {} + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +pub struct ObservedUpdate { + pub author: AccountAddress, + pub observed: ProviderJWKs, + pub signature: Signature, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct ObservedUpdateRequest { + pub epoch: u64, + pub issuer: Issuer, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct ObservedUpdateResponse { + pub epoch: u64, + pub update: ObservedUpdate, +} diff --git a/crates/validator-transaction-pool/Cargo.toml b/crates/validator-transaction-pool/Cargo.toml index c3eac3de8d54c..c0604b9d34f4b 100644 --- a/crates/validator-transaction-pool/Cargo.toml +++ b/crates/validator-transaction-pool/Cargo.toml @@ -23,3 +23,6 @@ tokio = { workspace = true } [dev-dependencies] aptos-types = { workspace = true, features = ["fuzzing"] } + +[features] +fuzzing = [] diff --git a/crates/validator-transaction-pool/src/lib.rs b/crates/validator-transaction-pool/src/lib.rs index dbe9d3065a17b..5ed1cd10dd56c 100644 --- a/crates/validator-transaction-pool/src/lib.rs +++ b/crates/validator-transaction-pool/src/lib.rs @@ -22,6 +22,10 @@ impl TransactionFilter { } impl TransactionFilter { + pub fn empty() -> Self { + Self::PendingTxnHashSet(HashSet::new()) + } + pub fn should_exclude(&self, txn: &ValidatorTransaction) -> bool { match self { TransactionFilter::PendingTxnHashSet(set) => set.contains(&txn.hash()), @@ -87,6 +91,14 @@ impl VTxnPoolState { .lock() .pull(deadline, max_items, max_bytes, filter) } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn dummy_txn_guard(&self) -> TxnGuard { + TxnGuard { + pool: self.inner.clone(), + seq_num: u64::MAX, + } + } } struct PoolItem {