From 1948535f73c2f0efb980ef3f618ef1dddfcf6aad Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 5 Aug 2024 08:46:27 +0800 Subject: [PATCH] Introduce network transaction manager (#32) * Nits * Initial network transaction manager * Answer the peer's transaction request * Remove timeout transactions * Add rpc `subcoin_sendTransaction` * Add transaction from peer * Add rpc subcoin_getRawTransaction * Add rpc subcoin_decodeRawTransaction * Fix bitcoin transaction broadcasting * Introduce SendTransactionResult * serde camelCase * Nit --- .../src/block_downloader/blocks_first.rs | 5 - crates/subcoin-network/src/connection.rs | 4 +- crates/subcoin-network/src/lib.rs | 97 ++++++++--- crates/subcoin-network/src/peer_manager.rs | 48 +++--- crates/subcoin-network/src/sync.rs | 9 +- .../src/transaction_manager.rs | 110 +++++++++++++ crates/subcoin-network/src/worker.rs | 155 ++++++++++++------ crates/subcoin-rpc/src/error.rs | 5 + crates/subcoin-rpc/src/subcoin.rs | 48 +++++- 9 files changed, 379 insertions(+), 102 deletions(-) create mode 100644 crates/subcoin-network/src/transaction_manager.rs diff --git a/crates/subcoin-network/src/block_downloader/blocks_first.rs b/crates/subcoin-network/src/block_downloader/blocks_first.rs index 2d738891c3239..a074993c62197 100644 --- a/crates/subcoin-network/src/block_downloader/blocks_first.rs +++ b/crates/subcoin-network/src/block_downloader/blocks_first.rs @@ -2,7 +2,6 @@ use crate::block_downloader::BlockDownloadManager; use crate::sync::{LocatorRequest, SyncAction, SyncRequest}; use crate::{Error, PeerId, SyncStatus}; use bitcoin::hashes::Hash; -use bitcoin::p2p::message::MAX_INV_SIZE; use bitcoin::p2p::message_blockdata::Inventory; use bitcoin::{Block as BitcoinBlock, BlockHash}; use sc_client_api::AuxStore; @@ -136,10 +135,6 @@ where // NOTE: `inv` can be received unsolicited as an announcement of a new block, // or in reply to `getblocks`. pub(crate) fn on_inv(&mut self, inventories: Vec, from: PeerId) -> SyncAction { - if inventories.len() > MAX_INV_SIZE { - return SyncAction::Disconnect(from, Error::TooManyInventoryItems); - } - // TODO: only handle the data from self.peer_id? let mut block_data_request = Vec::new(); diff --git a/crates/subcoin-network/src/connection.rs b/crates/subcoin-network/src/connection.rs index a4677107635aa..42b425cad984f 100644 --- a/crates/subcoin-network/src/connection.rs +++ b/crates/subcoin-network/src/connection.rs @@ -12,11 +12,11 @@ use std::time::Duration; use tokio::net::TcpStream; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +const MSG_HEADER_SIZE: usize = 24; + /// Channel for sending messages to the peer. pub type ConnectionWriter = UnboundedSender; -const MSG_HEADER_SIZE: usize = 24; - /// Represents the direction of connection. /// /// This enum is used to distinguish between inbound and outbound connections. diff --git a/crates/subcoin-network/src/lib.rs b/crates/subcoin-network/src/lib.rs index 167f3c93cb312..b39e16bc13981 100644 --- a/crates/subcoin-network/src/lib.rs +++ b/crates/subcoin-network/src/lib.rs @@ -35,12 +35,13 @@ mod peer_manager; mod sync; #[cfg(test)] mod tests; +mod transaction_manager; mod worker; use crate::connection::ConnectionInitiator; use crate::worker::NetworkWorker; use bitcoin::p2p::ServiceFlags; -use bitcoin::{BlockHash, Network as BitcoinNetwork}; +use bitcoin::{BlockHash, Network as BitcoinNetwork, Transaction, Txid}; use peer_manager::HandshakeState; use sc_client_api::{AuxStore, HeaderBackend}; use sc_consensus_nakamoto::BlockImportQueue; @@ -65,6 +66,8 @@ pub type Latency = u128; #[derive(Debug, thiserror::Error)] pub enum Error { + #[error("Invalid bootnode address: {0}")] + InvalidBootnode(String), #[error("Received 0 bytes, peer performed an orderly shutdown")] PeerShutdown, #[error("Cannot communicate with the network event stream")] @@ -75,6 +78,22 @@ pub enum Error { ConnectionNotFound(PeerId), #[error("Connecting to the stream timed out")] ConnectionTimeout, + #[error("Unexpected handshake state: {0:?}")] + UnexpectedHandshakeState(Box), + #[error("Only IPv4 peers are supported")] + Ipv4Only, + #[error("Peer is not a full node")] + NotFullNode, + #[error("Peer is not a segwit node")] + NotSegwitNode, + #[error("Peer's protocol version is too low")] + ProtocolVersionTooLow, + #[error("Too many block entries in inv message")] + TooManyBlockEntries, + #[error("Too many headers (> 2000)")] + TooManyHeaders, + #[error("Too many inventory items")] + TooManyInventoryItems, #[error("Ping timeout")] PingTimeout, #[error("Ping latency exceeds the threshold")] @@ -83,28 +102,10 @@ pub enum Error { SlowPeer(Latency), #[error("Unexpected pong message")] UnexpectedPong, - #[error("Too many block entries in inv message")] - TooManyBlockEntries, - #[error("Too many headers (> 2000)")] - TooManyHeaders, - #[error("Too many inventory items ")] - TooManyInventoryItems, - #[error("Peer is not a full node")] - NotFullNode, - #[error("Peer is not a segwit node")] - NotSegwitNode, - #[error("Peer's protocol version is too low")] - ProtocolVersionTooLow, #[error("Invalid pong message: bad nonce")] BadPong, - #[error("Invalid bootnode address: {0}")] - InvalidBootnode(String), #[error("Received an unrequested block: {0:?}")] UnrequestedBlock(BlockHash), - #[error("Unexpected handshake state: {0:?}")] - UnexpectedHandshakeState(Box), - #[error("Only IPv4 peers are supported")] - Ipv4Only, #[error("Other: {0}")] Other(String), #[error(transparent)] @@ -177,6 +178,7 @@ pub enum SyncStrategy { /// Represents the sync status of node. #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum SyncStatus { /// The node is idle and not currently major syncing. Idle, @@ -191,6 +193,7 @@ pub enum SyncStatus { /// Represents the status of network. #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct NetworkStatus { /// The number of peers currently connected to the node. pub num_connected_peers: usize, @@ -217,6 +220,20 @@ impl Clone for Bandwidth { } } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum SendTransactionResult { + Success(Txid), + Failure(String), +} + +/// An incoming transaction from RPC or network. +#[derive(Debug)] +pub(crate) struct IncomingTransaction { + pub(crate) txid: Txid, + pub(crate) transaction: Transaction, +} + /// Message to the network worker. #[derive(Debug)] enum NetworkWorkerMessage { @@ -226,6 +243,10 @@ enum NetworkWorkerMessage { SyncPeers(oneshot::Sender>), /// Retrieve the number of inbound connected peers. InboundPeersCount(oneshot::Sender), + /// Retrieve the transaction. + GetTransaction((Txid, oneshot::Sender>)), + /// Add transaction to the transaction manager. + SendTransaction((IncomingTransaction, oneshot::Sender)), } /// A handle for interacting with the network worker. @@ -268,6 +289,44 @@ impl NetworkHandle { receiver.await.unwrap_or_default() } + pub async fn get_transaction(&self, txid: Txid) -> Option { + let (sender, receiver) = oneshot::channel(); + + if self + .worker_msg_sender + .unbounded_send(NetworkWorkerMessage::GetTransaction((txid, sender))) + .is_err() + { + return None; + } + + receiver.await.ok().flatten() + } + + pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult { + let (sender, receiver) = oneshot::channel(); + + let txid = transaction.compute_txid(); + let incoming_transaction = IncomingTransaction { txid, transaction }; + + if self + .worker_msg_sender + .unbounded_send(NetworkWorkerMessage::SendTransaction(( + incoming_transaction, + sender, + ))) + .is_err() + { + return SendTransactionResult::Failure(format!( + "Failed to send transaction ({txid}) to worker" + )); + } + + receiver + .await + .unwrap_or(SendTransactionResult::Failure("Internal error".to_string())) + } + /// Returns a flag indicating whether the node is actively performing a major sync. pub fn is_major_syncing(&self) -> Arc { self.is_major_syncing.clone() diff --git a/crates/subcoin-network/src/peer_manager.rs b/crates/subcoin-network/src/peer_manager.rs index 1093520896391..69c5926af767a 100644 --- a/crates/subcoin-network/src/peer_manager.rs +++ b/crates/subcoin-network/src/peer_manager.rs @@ -325,33 +325,31 @@ where } None } else if self.last_eviction.elapsed() > EVICTION_INTERVAL { + // Find the slowest peer. + // // The set of outbound peers is full and the eviction interval elapsed, // try to evict the slowest peer for discovering potential better peers. - self.find_slowest_peer() + self.connected_peers + .iter() + .filter_map(|(peer_id, peer_info)| { + let average_latency = peer_info.ping_latency.average(); + + if average_latency > SLOW_PEER_LATENCY { + Some((peer_id, average_latency)) + } else { + None + } + }) + .max_by_key(|(_peer_id, average_latency)| *average_latency) + .map(|(peer_id, peer_latency)| SlowPeer { + peer_id: *peer_id, + peer_latency, + }) } else { None } } - fn find_slowest_peer(&self) -> Option { - self.connected_peers - .iter() - .filter_map(|(peer_id, peer_info)| { - let average_latency = peer_info.ping_latency.average(); - - if average_latency > SLOW_PEER_LATENCY { - Some((peer_id, average_latency)) - } else { - None - } - }) - .max_by_key(|(_peer_id, average_latency)| *average_latency) - .map(|(peer_id, peer_latency)| SlowPeer { - peer_id: *peer_id, - peer_latency, - }) - } - fn send_pings(&mut self, should_pings: Vec) { for peer_id in should_pings { if let Some(peer_info) = self.connected_peers.get_mut(&peer_id) { @@ -425,8 +423,13 @@ where self.connected_peers.contains_key(&peer_id) } + /// Returns the list of connected peers. + pub(crate) fn connected_peers(&self) -> impl Iterator { + self.connected_peers.keys() + } + /// Returns the number of connected peers. - pub(crate) fn connect_peers_count(&self) -> usize { + pub(crate) fn connected_peers_count(&self) -> usize { self.connected_peers.len() } @@ -561,7 +564,8 @@ where self.send(peer_id, NetworkMessage::Version(our_version))?; if greatest_common_version >= WTXID_RELAY_VERSION { - self.send(peer_id, NetworkMessage::WtxidRelay)?; + // TODO: support wtxidrelay + // self.send(peer_id, NetworkMessage::WtxidRelay)?; } // if greatest_common_version >= 70016 { diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 3e34fcf8a6252..6917c608a6424 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -20,8 +20,11 @@ use subcoin_primitives::ClientExt; // Do major sync when the current tip falls behind the network by 144 blocks (roughly one day). const MAJOR_SYNC_GAP: u32 = 144; +const LATENCY_IMPROVEMENT_THRESHOLD: f64 = 1.2; + /// The state of syncing between a Peer and ourselves. #[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum PeerSyncState { /// Available for sync requests. Available, @@ -40,6 +43,7 @@ impl PeerSyncState { /// Ping letency of the peer. #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum PeerLatency { Unknown, Average(Latency), @@ -64,6 +68,7 @@ impl PartialOrd for PeerLatency { /// Contains all the data about a Peer that we are trying to sync with. #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct PeerSync { /// Peer id of this peer. pub peer_id: PeerId, @@ -76,7 +81,7 @@ pub struct PeerSync { pub state: PeerSyncState, } -/// Locator based sync request, for either Header or Block. +/// Locator based sync request, for requesting either Headers or Blocks. #[derive(Debug, PartialEq, Eq)] pub(crate) struct LocatorRequest { pub locator_hashes: Vec, @@ -294,8 +299,6 @@ where if let (PeerLatency::Average(current_latency), PeerLatency::Average(best_latency)) = (current_sync_peer.latency, best_sync_peer.latency) { - const LATENCY_IMPROVEMENT_THRESHOLD: f64 = 1.2; - // Update sync peer if the latency improvement is significant. if current_latency as f64 / best_latency as f64 > LATENCY_IMPROVEMENT_THRESHOLD { let peer_id = best_sync_peer.peer_id; diff --git a/crates/subcoin-network/src/transaction_manager.rs b/crates/subcoin-network/src/transaction_manager.rs new file mode 100644 index 0000000000000..ab325920626ee --- /dev/null +++ b/crates/subcoin-network/src/transaction_manager.rs @@ -0,0 +1,110 @@ +use crate::{IncomingTransaction, PeerId}; +use bitcoin::{Transaction, Txid}; +use indexmap::map::Entry; +use indexmap::IndexMap; +use std::collections::HashSet; +use std::time::{Duration, SystemTime}; + +const TRANSACTION_TIMEOUT_DURATION_SECS: u64 = 10 * 60; + +#[derive(Debug)] +struct TransactionInfo { + /// The actual transaction to be sent to the network. + transaction: Transaction, + /// Set of peers to which we advertised this transaction. + /// + /// Note that having a peer in this set doesn't guarantee the the peer actually + /// received the transaction. + advertised: HashSet, + /// How long the transaction should be stored. + ttl: SystemTime, +} + +impl TransactionInfo { + fn new(transaction: Transaction) -> Self { + Self { + transaction, + advertised: HashSet::new(), + ttl: SystemTime::now() + Duration::from_secs(TRANSACTION_TIMEOUT_DURATION_SECS), + } + } +} + +/// This struct manages the transactions received from the network. +#[derive(Debug)] +pub(crate) struct TransactionManager { + /// List of transactions tracked by this manager, in the FIFO order. + transactions: IndexMap, +} + +impl TransactionManager { + /// Maximum number of transactions the manager holds. + const MAX_TRANSACTIONS: usize = 256; + + pub fn new() -> Self { + Self { + transactions: IndexMap::new(), + } + } + + /// Broadcast known transaction IDs to the connected peers. + pub fn on_tick<'a>( + &mut self, + connected_peers: impl Iterator, + ) -> Vec<(PeerId, Vec)> { + // Remove timeout transactions. + let now = SystemTime::now(); + self.transactions.retain(|txid, info| { + if info.ttl < now { + tracing::debug!("Removing timeout transaction {txid}"); + false + } else { + true + } + }); + + connected_peers + .filter_map(|address| { + let mut to_advertise = vec![]; + + for (txid, info) in self.transactions.iter_mut() { + if !info.advertised.contains(address) { + to_advertise.push(*txid); + info.advertised.insert(*address); + } + } + + if to_advertise.is_empty() { + None + } else { + Some((*address, to_advertise)) + } + }) + .collect() + } + + pub fn get_transaction(&self, txid: &Txid) -> Option { + self.transactions + .get(txid) + .map(|tx_info| tx_info.transaction.clone()) + } + + pub fn add_transaction( + &mut self, + incoming_transaction: IncomingTransaction, + ) -> Result { + let IncomingTransaction { txid, transaction } = incoming_transaction; + + if self.transactions.len() == Self::MAX_TRANSACTIONS { + self.transactions.shift_remove_index(0); + } + + match self.transactions.entry(txid) { + Entry::Occupied(_) => Err(format!("Already have transaction {txid}")), + Entry::Vacant(entry) => { + entry.insert(TransactionInfo::new(transaction)); + Ok(txid) + } + } + } +} diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index 6f4fb6e6d76a1..7537bbf368b94 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -1,8 +1,12 @@ use crate::connection::{ConnectionInitiator, Direction, NewConnection}; use crate::peer_manager::{Config, PeerManager, SlowPeer}; use crate::sync::{ChainSync, LocatorRequest, SyncAction, SyncRequest}; -use crate::{Bandwidth, Error, Latency, NetworkStatus, NetworkWorkerMessage, PeerId, SyncStrategy}; -use bitcoin::p2p::message::NetworkMessage; +use crate::transaction_manager::TransactionManager; +use crate::{ + Bandwidth, Error, IncomingTransaction, Latency, NetworkStatus, NetworkWorkerMessage, PeerId, + SendTransactionResult, SyncStrategy, +}; +use bitcoin::p2p::message::{NetworkMessage, MAX_INV_SIZE}; use bitcoin::p2p::message_blockdata::{GetBlocksMessage, GetHeadersMessage, Inventory}; use futures::stream::FusedStream; use futures::StreamExt; @@ -44,6 +48,7 @@ pub struct NetworkWorker { config: Config, network_event_receiver: UnboundedReceiver, peer_manager: PeerManager, + transaction_manager: TransactionManager, chain_sync: ChainSync, } @@ -71,6 +76,7 @@ where connection_initiator, max_outbound_peers, ), + transaction_manager: TransactionManager::new(), chain_sync: ChainSync::new(client, import_queue, sync_strategy, is_major_syncing), config, } @@ -115,9 +121,42 @@ where } } + async fn process_event(&mut self, event: Event) { + match event { + Event::NewConnection(new_connection) => { + self.peer_manager.on_new_connection(new_connection); + } + Event::OutboundConnectionFailure { peer_addr, reason } => { + self.peer_manager + .on_outbound_connection_failure(peer_addr, reason); + } + Event::Disconnect { peer_addr, reason } => { + self.peer_manager.disconnect(peer_addr, reason); + self.chain_sync.remove_peer(peer_addr); + } + Event::PeerMessage { + from, + direction, + payload, + } => { + let msg_cmd = payload.cmd(); + + tracing::trace!(?from, "Recv {msg_cmd}"); + + match self.process_network_message(from, direction, payload).await { + Ok(action) => self.do_sync_action(action), + Err(err) => { + tracing::error!(?from, ?err, "Failed to process peer message: {msg_cmd}"); + } + } + } + } + } + fn perform_periodic_actions(&mut self) { let sync_action = self.chain_sync.on_tick(); self.do_sync_action(sync_action); + if let Some(SlowPeer { peer_id, peer_latency, @@ -128,13 +167,24 @@ where self.peer_manager.update_last_eviction(); self.chain_sync.remove_peer(peer_id); } + + for (peer, txids) in self + .transaction_manager + .on_tick(self.peer_manager.connected_peers()) + { + tracing::debug!("Broadcasting transaction IDs {txids:?} to {peer:?}"); + let msg = NetworkMessage::Inv(txids.into_iter().map(Inventory::Transaction).collect()); + if let Err(err) = self.send(peer, msg) { + self.peer_manager.disconnect(peer, err); + } + } } - fn process_worker_message(&self, worker_msg: NetworkWorkerMessage, bandwidth: &Bandwidth) { + fn process_worker_message(&mut self, worker_msg: NetworkWorkerMessage, bandwidth: &Bandwidth) { match worker_msg { NetworkWorkerMessage::NetworkStatus(result_sender) => { let net_status = NetworkStatus { - num_connected_peers: self.peer_manager.connect_peers_count(), + num_connected_peers: self.peer_manager.connected_peers_count(), total_bytes_inbound: bandwidth.total_bytes_inbound.load(Ordering::Relaxed), total_bytes_outbound: bandwidth.total_bytes_outbound.load(Ordering::Relaxed), sync_status: self.chain_sync.sync_status(), @@ -148,37 +198,18 @@ where NetworkWorkerMessage::InboundPeersCount(result_sender) => { let _ = result_sender.send(self.peer_manager.inbound_peers_count()); } - } - } - - async fn process_event(&mut self, event: Event) { - match event { - Event::NewConnection(new_connection) => { - self.peer_manager.on_new_connection(new_connection); - } - Event::OutboundConnectionFailure { peer_addr, reason } => { - self.peer_manager - .on_outbound_connection_failure(peer_addr, reason); - } - Event::Disconnect { peer_addr, reason } => { - self.peer_manager.disconnect(peer_addr, reason); - self.chain_sync.remove_peer(peer_addr); + NetworkWorkerMessage::GetTransaction((txid, result_sender)) => { + let _ = result_sender.send(self.transaction_manager.get_transaction(&txid)); } - Event::PeerMessage { - from, - direction, - payload, - } => { - let msg_cmd = payload.cmd(); - - tracing::trace!(?from, "Recv {msg_cmd}"); - - match self.process_network_message(from, direction, payload).await { - Ok(action) => self.do_sync_action(action), - Err(err) => { - tracing::error!(?from, ?err, "Failed to process peer message: {msg_cmd}"); - } - } + NetworkWorkerMessage::SendTransaction((incoming_transaction, result_sender)) => { + let send_transaction_result = match self + .transaction_manager + .add_transaction(incoming_transaction) + { + Ok(txid) => SendTransactionResult::Success(txid), + Err(error_msg) => SendTransactionResult::Failure(error_msg), + }; + let _ = result_sender.send(send_transaction_result); } } } @@ -220,12 +251,22 @@ where self.peer_manager.on_addr(from, addresses); Ok(SyncAction::None) } - NetworkMessage::Tx(_tx) => { - // TODO: handle tx + NetworkMessage::Tx(tx) => { + // TODO: Check has relay permission. + let incoming_transaction = IncomingTransaction { + txid: tx.compute_txid(), + transaction: tx, + }; + if let Err(err_msg) = self + .transaction_manager + .add_transaction(incoming_transaction) + { + tracing::debug!(?from, "Failed to add transaction: {err_msg}"); + } Ok(SyncAction::None) } NetworkMessage::GetData(inv) => { - self.process_get_data(inv); + self.process_get_data(from, inv); Ok(SyncAction::None) } NetworkMessage::GetBlocks(_) => { @@ -280,7 +321,7 @@ where self.send(from, NetworkMessage::FeeFilter(1000))?; Ok(SyncAction::None) } - NetworkMessage::Inv(inv) => Ok(self.chain_sync.on_inv(inv, from)), + NetworkMessage::Inv(inv) => self.process_inv(from, inv), NetworkMessage::Block(block) => Ok(self.chain_sync.on_block(block, from)), NetworkMessage::Headers(headers) => Ok(self.chain_sync.on_headers(headers, from)), NetworkMessage::MerkleBlock(_) => Ok(SyncAction::None), @@ -371,17 +412,35 @@ where } } - fn process_get_data(&self, get_data_requests: Vec) { - // TODO: process tx as many as possible. + fn process_inv(&mut self, from: PeerId, inv: Vec) -> Result { + if inv.len() > MAX_INV_SIZE { + return Ok(SyncAction::Disconnect(from, Error::TooManyInventoryItems)); + } - // TODO: process one BLOCK item per call, as Bitcore Core does. - for get_block_msg in get_data_requests.iter().filter(|inv| { - matches!( - inv, - Inventory::Block(_) | Inventory::CompactBlock(_) | Inventory::WitnessBlock(_) - ) - }) { - self.process_get_block_data(get_block_msg); + Ok(self.chain_sync.on_inv(inv, from)) + } + + fn process_get_data(&self, from: PeerId, get_data_requests: Vec) { + // TODO: process tx as many as possible. + for inv in get_data_requests { + match inv { + Inventory::Block(_) | Inventory::CompactBlock(_) | Inventory::WitnessBlock(_) => { + // TODO: process one BLOCK item per call, as Bitcore Core does. + self.process_get_block_data(&inv); + } + Inventory::Transaction(txid) => { + tracing::debug!("Recv transaction request: {txid:?} from {from:?}"); + if let Some(transaction) = self.transaction_manager.get_transaction(&txid) { + if let Err(err) = self.send(from, NetworkMessage::Tx(transaction)) { + tracing::error!(?err, "Failed to send transaction {txid} to {from:?}"); + } + } + } + Inventory::WTx(_) + | Inventory::WitnessTransaction(_) + | Inventory::Unknown { .. } + | Inventory::Error => {} + } } } diff --git a/crates/subcoin-rpc/src/error.rs b/crates/subcoin-rpc/src/error.rs index 9b2c16398b2fa..9c2bd31481c67 100644 --- a/crates/subcoin-rpc/src/error.rs +++ b/crates/subcoin-rpc/src/error.rs @@ -1,3 +1,4 @@ +use bitcoin::consensus::encode::FromHexError; use jsonrpsee::types::error::ErrorObject; use jsonrpsee::types::ErrorObjectOwned; @@ -15,6 +16,10 @@ pub enum Error { Header(subcoin_primitives::HeaderError), #[error(transparent)] Blockchain(#[from] sp_blockchain::Error), + #[error(transparent)] + DecodeHex(#[from] FromHexError), + #[error(transparent)] + SerdeJson(#[from] serde_json::Error), /// Client error. #[error("Client error: {0}")] Client(#[from] Box), diff --git a/crates/subcoin-rpc/src/subcoin.rs b/crates/subcoin-rpc/src/subcoin.rs index a3fa985c611d7..bdec55888c5a7 100644 --- a/crates/subcoin-rpc/src/subcoin.rs +++ b/crates/subcoin-rpc/src/subcoin.rs @@ -1,13 +1,18 @@ use crate::error::Error; +use bitcoin::consensus::encode::{deserialize_hex, serialize_hex}; +use bitcoin::{Transaction, Txid}; use jsonrpsee::proc_macros::rpc; use sc_client_api::{AuxStore, BlockBackend, HeaderBackend}; use serde::{Deserialize, Serialize}; use sp_runtime::traits::Block as BlockT; use std::marker::PhantomData; use std::sync::Arc; -use subcoin_network::{NetworkHandle, NetworkStatus, PeerSync, PeerSyncState}; +use subcoin_network::{ + NetworkHandle, NetworkStatus, PeerSync, PeerSyncState, SendTransactionResult, +}; #[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct NetworkPeers { total: usize, available: usize, @@ -18,6 +23,18 @@ pub struct NetworkPeers { #[rpc(client, server)] pub trait SubcoinApi { + /// Returns a JSON object representing the serialized, hex-encoded transaction. + /// + /// # Arguments + /// + /// - `raw_tx`: The transaction hex string. + #[method(name = "subcoin_decodeRawTransaction", blocking)] + fn decode_raw_transaction(&self, raw_tx: String) -> Result; + + /// Returns the raw transaction data for given txid. + #[method(name = "subcoin_getRawTransaction")] + async fn get_raw_transaction(&self, txid: Txid) -> Result, Error>; + /// Get overall network status. #[method(name = "subcoin_networkStatus")] async fn network_status(&self) -> Result, Error>; @@ -25,6 +42,14 @@ pub trait SubcoinApi { /// Get the sync peers. #[method(name = "subcoin_networkPeers")] async fn network_peers(&self) -> Result; + + /// Submits a raw transaction (serialized, hex-encoded) to local node and network. + /// + /// # Arguments + /// + /// - `raw_tx`: The hex string of the raw transaction. + #[method(name = "subcoin_sendRawTransaction")] + async fn send_raw_transaction(&self, raw_tx: String) -> Result; } /// This struct provides the Subcoin API. @@ -56,8 +81,14 @@ where Block: BlockT + 'static, Client: HeaderBackend + BlockBackend + AuxStore + 'static, { - async fn network_status(&self) -> Result, Error> { - Ok(self.network_handle.status().await) + fn decode_raw_transaction(&self, raw_tx: String) -> Result { + let transaction = deserialize_hex::(&raw_tx)?; + Ok(serde_json::to_value(transaction)?) + } + + async fn get_raw_transaction(&self, txid: Txid) -> Result, Error> { + let maybe_transaction = self.network_handle.get_transaction(txid).await; + Ok(maybe_transaction.as_ref().map(serialize_hex)) } async fn network_peers(&self) -> Result { @@ -87,4 +118,15 @@ where sync_peers, }) } + + async fn network_status(&self) -> Result, Error> { + Ok(self.network_handle.status().await) + } + + async fn send_raw_transaction(&self, raw_tx: String) -> Result { + Ok(self + .network_handle + .send_transaction(deserialize_hex::(&raw_tx)?) + .await) + } }