Skip to content

Commit

Permalink
jwk #3: jwk consensus deps, network, and type defs (aptos-labs#11856)
Browse files Browse the repository at this point in the history
* jwk types update

* update

* update

* jwk txn and execution

* consensus ensure jwk txns are expected

* update

* jwk consensus network type defs

* update cargo.toml

* update

* update

* update

* lint

* update
  • Loading branch information
zjma authored Feb 5, 2024
1 parent 0503b18 commit dae6f11
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 36 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aptos-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_config::{
use aptos_consensus::network_interface::ConsensusMsg;
use aptos_dkg_runtime::DKGMessage;
use aptos_event_notifications::EventSubscriptionService;
use aptos_jwk_consensus::JWKConsensusMsg;
use aptos_jwk_consensus::types::JWKConsensusMsg;
use aptos_logger::debug;
use aptos_mempool::network::MempoolSyncMsg;
use aptos_network::{
Expand Down
25 changes: 25 additions & 0 deletions crates/aptos-jwk-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,40 @@ repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
aptos-bitvec = { workspace = true }
aptos-bounded-executor = { workspace = true }
aptos-channels = { workspace = true }
aptos-config = { workspace = true }
aptos-consensus-types = { workspace = true }
aptos-crypto = { workspace = true }
aptos-enum-conversion-derive = { workspace = true }
aptos-event-notifications = { workspace = true }
aptos-global-constants = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-network = { workspace = true }
aptos-reliable-broadcast = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-time-service = { workspace = true }
aptos-types = { workspace = true }
aptos-validator-transaction-pool = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
futures-channel = { workspace = true }
futures-util = { workspace = true }
move-core-types = { workspace = true }
once_cell = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-retry = { workspace = true }

[dev-dependencies]
aptos-types = { workspace = true, features = ["fuzzing"] }
aptos-validator-transaction-pool = { workspace = true, features = ["fuzzing"] }
[features]
smoke-test = []
38 changes: 4 additions & 34 deletions crates/aptos-jwk-consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
// Copyright © Aptos Foundation

use aptos_config::network_id::{NetworkId, PeerNetworkId};
use crate::types::JWKConsensusMsg;
use aptos_event_notifications::{
DbBackedOnChainConfig, EventNotificationListener, ReconfigNotificationListener,
};
use aptos_network::application::{
error::Error,
interface::{NetworkClient, NetworkClientInterface, NetworkServiceEvents},
};
use aptos_types::PeerId;
use aptos_network::application::interface::{NetworkClient, NetworkServiceEvents};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::runtime::Runtime;

#[allow(clippy::let_and_return)]
Expand All @@ -35,30 +29,6 @@ pub fn start_jwk_consensus_runtime(
runtime
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JWKConsensusMsg {}

#[derive(Clone)]
pub struct JWKNetworkClient<NetworkClient> {
network_client: NetworkClient,
}

impl<NetworkClient: NetworkClientInterface<JWKConsensusMsg>> JWKNetworkClient<NetworkClient> {
pub fn new(network_client: NetworkClient) -> Self {
Self { network_client }
}

pub async fn send_rpc(
&self,
peer: PeerId,
message: JWKConsensusMsg,
rpc_timeout: Duration,
) -> Result<JWKConsensusMsg, Error> {
let peer_network_id = PeerNetworkId::new(NetworkId::Validator, peer);
self.network_client
.send_to_peer_rpc(message, rpc_timeout, peer_network_id)
.await
}
}

pub mod network;
pub mod network_interface;
pub mod types;
184 changes: 184 additions & 0 deletions crates/aptos-jwk-consensus/src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright © Aptos Foundation

use crate::{
network_interface::{JWKConsensusNetworkClient, RPC},
types::JWKConsensusMsg,
};
use anyhow::bail;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::NetworkId;
use aptos_consensus_types::common::Author;
#[cfg(test)]
use aptos_infallible::RwLock;
use aptos_logger::warn;
use aptos_network::{
application::interface::{NetworkClient, NetworkServiceEvents},
protocols::network::{Event, RpcError},
ProtocolId,
};
use aptos_reliable_broadcast::RBNetworkSender;
use aptos_types::account_address::AccountAddress;
use bytes::Bytes;
use futures::Stream;
use futures_channel::oneshot;
use futures_util::{
stream::{select, select_all, StreamExt},
SinkExt,
};
#[cfg(test)]
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;

pub struct IncomingRpcRequest {
pub msg: JWKConsensusMsg,
pub sender: AccountAddress,
pub response_sender: Box<dyn RpcResponseSender>,
}

pub struct NetworkSender {
author: AccountAddress,
jwk_network_client: JWKConsensusNetworkClient<NetworkClient<JWKConsensusMsg>>,
self_sender: aptos_channels::Sender<Event<JWKConsensusMsg>>,
}

impl NetworkSender {
pub fn new(
author: AccountAddress,
jwk_network_client: JWKConsensusNetworkClient<NetworkClient<JWKConsensusMsg>>,
self_sender: aptos_channels::Sender<Event<JWKConsensusMsg>>,
) -> Self {
Self {
author,
jwk_network_client,
self_sender,
}
}
}

#[async_trait::async_trait]
impl RBNetworkSender<JWKConsensusMsg> for NetworkSender {
async fn send_rb_rpc(
&self,
receiver: Author,
msg: JWKConsensusMsg,
time_limit: Duration,
) -> anyhow::Result<JWKConsensusMsg> {
if receiver == self.author {
let (tx, rx) = oneshot::channel();
let self_msg = Event::RpcRequest(receiver, msg, RPC[0], tx);
self.self_sender.clone().send(self_msg).await?;
if let Ok(Ok(Ok(bytes))) = timeout(time_limit, rx).await {
Ok(RPC[0].from_bytes(&bytes)?)
} else {
bail!("self rpc failed");
}
} else {
let result = self
.jwk_network_client
.send_rpc(receiver, msg, time_limit)
.await?;
Ok(result)
}
}
}

pub trait RpcResponseSender: Send + Sync {
fn send(&mut self, response: anyhow::Result<JWKConsensusMsg>);
}

pub struct RealRpcResponseSender {
pub inner: Option<oneshot::Sender<Result<Bytes, RpcError>>>,
pub protocol: ProtocolId,
}

impl RpcResponseSender for RealRpcResponseSender {
fn send(&mut self, response: anyhow::Result<JWKConsensusMsg>) {
let rpc_response = response
.and_then(|msg| self.protocol.to_bytes(&msg).map(Bytes::from))
.map_err(RpcError::ApplicationError);
if let Some(tx) = self.inner.take() {
let _ = tx.send(rpc_response);
}
}
}

#[cfg(test)]
pub struct DummyRpcResponseSender {
pub rpc_response_collector: Arc<RwLock<Vec<anyhow::Result<JWKConsensusMsg>>>>,
}

#[cfg(test)]
impl DummyRpcResponseSender {
pub fn new(rpc_response_collector: Arc<RwLock<Vec<anyhow::Result<JWKConsensusMsg>>>>) -> Self {
Self {
rpc_response_collector,
}
}
}

#[cfg(test)]
impl RpcResponseSender for DummyRpcResponseSender {
fn send(&mut self, response: anyhow::Result<JWKConsensusMsg>) {
self.rpc_response_collector.write().push(response);
}
}

pub struct NetworkReceivers {
pub rpc_rx: aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingRpcRequest)>,
}

pub struct NetworkTask {
all_events: Box<dyn Stream<Item = Event<JWKConsensusMsg>> + Send + Unpin>,
rpc_tx: aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingRpcRequest)>,
}

impl NetworkTask {
/// Establishes the initial connections with the peers and returns the receivers.
pub fn new(
network_service_events: NetworkServiceEvents<JWKConsensusMsg>,
self_receiver: aptos_channels::Receiver<Event<JWKConsensusMsg>>,
) -> (NetworkTask, NetworkReceivers) {
let (rpc_tx, rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None);

let network_and_events = network_service_events.into_network_and_events();
if (network_and_events.values().len() != 1)
|| !network_and_events.contains_key(&NetworkId::Validator)
{
panic!("The network has not been setup correctly for JWK consensus!");
}

// Collect all the network events into a single stream
let network_events: Vec<_> = network_and_events.into_values().collect();
let network_events = select_all(network_events).fuse();
let all_events = Box::new(select(network_events, self_receiver));

(NetworkTask { rpc_tx, all_events }, NetworkReceivers {
rpc_rx,
})
}

pub async fn start(mut self) {
while let Some(message) = self.all_events.next().await {
match message {
Event::RpcRequest(peer_id, msg, protocol, response_sender) => {
let req = IncomingRpcRequest {
msg,
sender: peer_id,
response_sender: Box::new(RealRpcResponseSender {
inner: Some(response_sender),
protocol,
}),
};

if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req)) {
warn!(error = ?e, "aptos channel closed");
};
},
_ => {
// Ignore
},
}
}
}
}
34 changes: 33 additions & 1 deletion crates/aptos-jwk-consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
// Copyright © Aptos Foundation

use aptos_network::ProtocolId;
use crate::types::JWKConsensusMsg;
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_network::{
application::{error::Error, interface::NetworkClientInterface},
ProtocolId,
};
use move_core_types::account_address::AccountAddress as PeerId;
use std::time::Duration;

/// Supported protocols in preferred order (from highest priority to lowest).
pub const DIRECT_SEND: &[ProtocolId] = &[
Expand All @@ -15,3 +22,28 @@ pub const RPC: &[ProtocolId] = &[
ProtocolId::JWKConsensusRpcBcs,
ProtocolId::JWKConsensusRpcJson,
];

#[derive(Clone)]
pub struct JWKConsensusNetworkClient<NetworkClient> {
network_client: NetworkClient,
}

impl<NetworkClient: NetworkClientInterface<JWKConsensusMsg>>
JWKConsensusNetworkClient<NetworkClient>
{
pub fn new(network_client: NetworkClient) -> Self {
Self { network_client }
}

pub async fn send_rpc(
&self,
peer: PeerId,
message: JWKConsensusMsg,
rpc_timeout: Duration,
) -> Result<JWKConsensusMsg, Error> {
let peer_network_id = PeerNetworkId::new(NetworkId::Validator, peer);
self.network_client
.send_to_peer_rpc(message, rpc_timeout, peer_network_id)
.await
}
}
Loading

0 comments on commit dae6f11

Please sign in to comment.