Skip to content

Commit

Permalink
Introduce network transaction manager (paritytech#32)
Browse files Browse the repository at this point in the history
* Nits

* Initial network transaction manager

* Answer the peer's transaction request

* Remove timeout transactions

* Add rpc `subcoin_sendTransaction`

* Add transaction from peer

* Add rpc subcoin_getRawTransaction

* Add rpc subcoin_decodeRawTransaction

* Fix bitcoin transaction broadcasting

* Introduce SendTransactionResult

* serde camelCase

* Nit
  • Loading branch information
liuchengxu authored Aug 5, 2024
1 parent 16ec401 commit 1948535
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 102 deletions.
5 changes: 0 additions & 5 deletions crates/subcoin-network/src/block_downloader/blocks_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::block_downloader::BlockDownloadManager;
use crate::sync::{LocatorRequest, SyncAction, SyncRequest};
use crate::{Error, PeerId, SyncStatus};
use bitcoin::hashes::Hash;
use bitcoin::p2p::message::MAX_INV_SIZE;
use bitcoin::p2p::message_blockdata::Inventory;
use bitcoin::{Block as BitcoinBlock, BlockHash};
use sc_client_api::AuxStore;
Expand Down Expand Up @@ -136,10 +135,6 @@ where
// NOTE: `inv` can be received unsolicited as an announcement of a new block,
// or in reply to `getblocks`.
pub(crate) fn on_inv(&mut self, inventories: Vec<Inventory>, from: PeerId) -> SyncAction {
if inventories.len() > MAX_INV_SIZE {
return SyncAction::Disconnect(from, Error::TooManyInventoryItems);
}

// TODO: only handle the data from self.peer_id?

let mut block_data_request = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions crates/subcoin-network/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

const MSG_HEADER_SIZE: usize = 24;

/// Channel for sending messages to the peer.
pub type ConnectionWriter = UnboundedSender<NetworkMessage>;

const MSG_HEADER_SIZE: usize = 24;

/// Represents the direction of connection.
///
/// This enum is used to distinguish between inbound and outbound connections.
Expand Down
97 changes: 78 additions & 19 deletions crates/subcoin-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ mod peer_manager;
mod sync;
#[cfg(test)]
mod tests;
mod transaction_manager;
mod worker;

use crate::connection::ConnectionInitiator;
use crate::worker::NetworkWorker;
use bitcoin::p2p::ServiceFlags;
use bitcoin::{BlockHash, Network as BitcoinNetwork};
use bitcoin::{BlockHash, Network as BitcoinNetwork, Transaction, Txid};
use peer_manager::HandshakeState;
use sc_client_api::{AuxStore, HeaderBackend};
use sc_consensus_nakamoto::BlockImportQueue;
Expand All @@ -65,6 +66,8 @@ pub type Latency = u128;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Invalid bootnode address: {0}")]
InvalidBootnode(String),
#[error("Received 0 bytes, peer performed an orderly shutdown")]
PeerShutdown,
#[error("Cannot communicate with the network event stream")]
Expand All @@ -75,6 +78,22 @@ pub enum Error {
ConnectionNotFound(PeerId),
#[error("Connecting to the stream timed out")]
ConnectionTimeout,
#[error("Unexpected handshake state: {0:?}")]
UnexpectedHandshakeState(Box<HandshakeState>),
#[error("Only IPv4 peers are supported")]
Ipv4Only,
#[error("Peer is not a full node")]
NotFullNode,
#[error("Peer is not a segwit node")]
NotSegwitNode,
#[error("Peer's protocol version is too low")]
ProtocolVersionTooLow,
#[error("Too many block entries in inv message")]
TooManyBlockEntries,
#[error("Too many headers (> 2000)")]
TooManyHeaders,
#[error("Too many inventory items")]
TooManyInventoryItems,
#[error("Ping timeout")]
PingTimeout,
#[error("Ping latency exceeds the threshold")]
Expand All @@ -83,28 +102,10 @@ pub enum Error {
SlowPeer(Latency),
#[error("Unexpected pong message")]
UnexpectedPong,
#[error("Too many block entries in inv message")]
TooManyBlockEntries,
#[error("Too many headers (> 2000)")]
TooManyHeaders,
#[error("Too many inventory items ")]
TooManyInventoryItems,
#[error("Peer is not a full node")]
NotFullNode,
#[error("Peer is not a segwit node")]
NotSegwitNode,
#[error("Peer's protocol version is too low")]
ProtocolVersionTooLow,
#[error("Invalid pong message: bad nonce")]
BadPong,
#[error("Invalid bootnode address: {0}")]
InvalidBootnode(String),
#[error("Received an unrequested block: {0:?}")]
UnrequestedBlock(BlockHash),
#[error("Unexpected handshake state: {0:?}")]
UnexpectedHandshakeState(Box<HandshakeState>),
#[error("Only IPv4 peers are supported")]
Ipv4Only,
#[error("Other: {0}")]
Other(String),
#[error(transparent)]
Expand Down Expand Up @@ -177,6 +178,7 @@ pub enum SyncStrategy {

/// Represents the sync status of node.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SyncStatus {
/// The node is idle and not currently major syncing.
Idle,
Expand All @@ -191,6 +193,7 @@ pub enum SyncStatus {

/// Represents the status of network.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkStatus {
/// The number of peers currently connected to the node.
pub num_connected_peers: usize,
Expand All @@ -217,6 +220,20 @@ impl Clone for Bandwidth {
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SendTransactionResult {
Success(Txid),
Failure(String),
}

/// An incoming transaction from RPC or network.
#[derive(Debug)]
pub(crate) struct IncomingTransaction {
pub(crate) txid: Txid,
pub(crate) transaction: Transaction,
}

/// Message to the network worker.
#[derive(Debug)]
enum NetworkWorkerMessage {
Expand All @@ -226,6 +243,10 @@ enum NetworkWorkerMessage {
SyncPeers(oneshot::Sender<Vec<PeerSync>>),
/// Retrieve the number of inbound connected peers.
InboundPeersCount(oneshot::Sender<usize>),
/// Retrieve the transaction.
GetTransaction((Txid, oneshot::Sender<Option<Transaction>>)),
/// Add transaction to the transaction manager.
SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
}

/// A handle for interacting with the network worker.
Expand Down Expand Up @@ -268,6 +289,44 @@ impl NetworkHandle {
receiver.await.unwrap_or_default()
}

pub async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
let (sender, receiver) = oneshot::channel();

if self
.worker_msg_sender
.unbounded_send(NetworkWorkerMessage::GetTransaction((txid, sender)))
.is_err()
{
return None;
}

receiver.await.ok().flatten()
}

pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
let (sender, receiver) = oneshot::channel();

let txid = transaction.compute_txid();
let incoming_transaction = IncomingTransaction { txid, transaction };

if self
.worker_msg_sender
.unbounded_send(NetworkWorkerMessage::SendTransaction((
incoming_transaction,
sender,
)))
.is_err()
{
return SendTransactionResult::Failure(format!(
"Failed to send transaction ({txid}) to worker"
));
}

receiver
.await
.unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
}

/// Returns a flag indicating whether the node is actively performing a major sync.
pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
self.is_major_syncing.clone()
Expand Down
48 changes: 26 additions & 22 deletions crates/subcoin-network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,33 +325,31 @@ where
}
None
} else if self.last_eviction.elapsed() > EVICTION_INTERVAL {
// Find the slowest peer.
//
// The set of outbound peers is full and the eviction interval elapsed,
// try to evict the slowest peer for discovering potential better peers.
self.find_slowest_peer()
self.connected_peers
.iter()
.filter_map(|(peer_id, peer_info)| {
let average_latency = peer_info.ping_latency.average();

if average_latency > SLOW_PEER_LATENCY {
Some((peer_id, average_latency))
} else {
None
}
})
.max_by_key(|(_peer_id, average_latency)| *average_latency)
.map(|(peer_id, peer_latency)| SlowPeer {
peer_id: *peer_id,
peer_latency,
})
} else {
None
}
}

fn find_slowest_peer(&self) -> Option<SlowPeer> {
self.connected_peers
.iter()
.filter_map(|(peer_id, peer_info)| {
let average_latency = peer_info.ping_latency.average();

if average_latency > SLOW_PEER_LATENCY {
Some((peer_id, average_latency))
} else {
None
}
})
.max_by_key(|(_peer_id, average_latency)| *average_latency)
.map(|(peer_id, peer_latency)| SlowPeer {
peer_id: *peer_id,
peer_latency,
})
}

fn send_pings(&mut self, should_pings: Vec<PeerId>) {
for peer_id in should_pings {
if let Some(peer_info) = self.connected_peers.get_mut(&peer_id) {
Expand Down Expand Up @@ -425,8 +423,13 @@ where
self.connected_peers.contains_key(&peer_id)
}

/// Returns the list of connected peers.
pub(crate) fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connected_peers.keys()
}

/// Returns the number of connected peers.
pub(crate) fn connect_peers_count(&self) -> usize {
pub(crate) fn connected_peers_count(&self) -> usize {
self.connected_peers.len()
}

Expand Down Expand Up @@ -561,7 +564,8 @@ where
self.send(peer_id, NetworkMessage::Version(our_version))?;

if greatest_common_version >= WTXID_RELAY_VERSION {
self.send(peer_id, NetworkMessage::WtxidRelay)?;
// TODO: support wtxidrelay
// self.send(peer_id, NetworkMessage::WtxidRelay)?;
}

// if greatest_common_version >= 70016 {
Expand Down
9 changes: 6 additions & 3 deletions crates/subcoin-network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use subcoin_primitives::ClientExt;
// Do major sync when the current tip falls behind the network by 144 blocks (roughly one day).
const MAJOR_SYNC_GAP: u32 = 144;

const LATENCY_IMPROVEMENT_THRESHOLD: f64 = 1.2;

/// The state of syncing between a Peer and ourselves.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum PeerSyncState {
/// Available for sync requests.
Available,
Expand All @@ -40,6 +43,7 @@ impl PeerSyncState {

/// Ping letency of the peer.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum PeerLatency {
Unknown,
Average(Latency),
Expand All @@ -64,6 +68,7 @@ impl PartialOrd for PeerLatency {

/// Contains all the data about a Peer that we are trying to sync with.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PeerSync {
/// Peer id of this peer.
pub peer_id: PeerId,
Expand All @@ -76,7 +81,7 @@ pub struct PeerSync {
pub state: PeerSyncState,
}

/// Locator based sync request, for either Header or Block.
/// Locator based sync request, for requesting either Headers or Blocks.
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct LocatorRequest {
pub locator_hashes: Vec<BlockHash>,
Expand Down Expand Up @@ -294,8 +299,6 @@ where
if let (PeerLatency::Average(current_latency), PeerLatency::Average(best_latency)) =
(current_sync_peer.latency, best_sync_peer.latency)
{
const LATENCY_IMPROVEMENT_THRESHOLD: f64 = 1.2;

// Update sync peer if the latency improvement is significant.
if current_latency as f64 / best_latency as f64 > LATENCY_IMPROVEMENT_THRESHOLD {
let peer_id = best_sync_peer.peer_id;
Expand Down
Loading

0 comments on commit 1948535

Please sign in to comment.