Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Refactor network transactions handling #5939

Merged
merged 3 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multia
#[doc(hidden)]
pub use crate::protocol::ProtocolConfig;

use crate::{ExHashT, ReportHandle};
use crate::ExHashT;

use core::{fmt, iter};
use futures::future;
use libp2p::identity::{ed25519, Keypair};
use libp2p::wasm_ext;
use libp2p::{multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
use sc_peerset::ReputationChange;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr};
Expand Down Expand Up @@ -167,6 +167,22 @@ impl<B: BlockT> FinalityProofRequestBuilder<B> for DummyFinalityProofRequestBuil
/// Shared finality proof request builder struct used by the queue.
pub type BoxFinalityProofRequestBuilder<B> = Box<dyn FinalityProofRequestBuilder<B> + Send + Sync>;

/// Result of the transaction import.
#[derive(Clone, Copy, Debug)]
pub enum TransactionImport {
/// Transaction is good but already known by the transaction pool.
KnownGood,
/// Transaction is good and not yet known.
NewGood,
/// Transaction is invalid.
Bad,
/// Transaction import was not performed.
None,
}

/// Fuure resolving to transaction import result.
pub type TransactionImportFuture = Pin<Box<dyn Future<Output=TransactionImport> + Send>>;

/// Transaction pool interface
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
Expand All @@ -175,15 +191,11 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
fn hash_of(&self, transaction: &B::Extrinsic) -> H;
/// Import a transaction into the pool.
///
/// Peer reputation is changed by reputation_change if transaction is accepted by the pool.
/// This will return future.
fn import(
&self,
report_handle: ReportHandle,
who: PeerId,
reputation_change_good: ReputationChange,
reputation_change_bad: ReputationChange,
transaction: B::Extrinsic,
);
) -> TransactionImportFuture;
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
/// Get transaction by hash.
Expand All @@ -209,12 +221,10 @@ impl<H: ExHashT + Default, B: BlockT> TransactionPool<H, B> for EmptyTransaction

fn import(
&self,
_report_handle: ReportHandle,
_who: PeerId,
_rep_change_good: ReputationChange,
_rep_change_bad: ReputationChange,
_transaction: B::Extrinsic
) {}
) -> TransactionImportFuture {
Box::pin(future::ready(TransactionImport::KnownGood))
}

fn on_broadcasted(&self, _: HashMap<H, Vec<String>>) {}

Expand Down
76 changes: 64 additions & 12 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
use crate::{
ExHashT,
chain::{Client, FinalityProofProvider},
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool},
config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
error,
utils::interval
};

use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use futures::{prelude::*, stream::FuturesUnordered};
use generic_proto::{GenericProto, GenericProtoOut};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
Expand Down Expand Up @@ -78,6 +78,9 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Maximim number of known extrinsic hashes to keep for a peer.
const MAX_KNOWN_EXTRINSICS: usize = 4096; // ~128kb per peer + overhead

/// Maximim number of transaction validation request we keep at any moment.
const MAX_PENDING_TRANSACTIONS: usize = 8192;

/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
Expand All @@ -101,6 +104,13 @@ mod rep {
pub const UNEXPECTED_STATUS: Rep = Rep::new(-(1 << 20), "Unexpected status message");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any extrinsic.
///
/// This forces node to verify it, thus the negative value here. Once extrinsic is verified,
/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND`
pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic");
/// Reputation change when a peer sends us any extrinsic that is not invalid.
pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)");
/// Reputation change when a peer sends us an extrinsic that we didn't know about.
pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic");
/// Reputation change when a peer sends us a bad extrinsic.
Expand Down Expand Up @@ -182,6 +192,24 @@ impl Metrics {
}
}

struct PendingTransaction {
validation: TransactionImportFuture,
peer_id: PeerId,
}

impl Future for PendingTransaction {
type Output = (PeerId, TransactionImport);

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
return Poll::Ready((this.peer_id.clone(), import_result));
}

Poll::Pending
}
}

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`.
Expand All @@ -190,6 +218,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending extrinsic verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>,
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
Expand Down Expand Up @@ -394,6 +424,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
pending_messages: VecDeque::new(),
pending_transactions: FuturesUnordered::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
Expand Down Expand Up @@ -1118,20 +1149,37 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!(
target: "sync",
"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
MAX_PENDING_TRANSACTIONS,
);
break;
}

let hash = self.transaction_pool.hash_of(&t);
peer.known_extrinsics.insert(hash);

self.transaction_pool.import(
self.peerset_handle.clone().into(),
who.clone(),
rep::GOOD_EXTRINSIC,
rep::BAD_EXTRINSIC,
t,
);
self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC);

self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(),
validation: self.transaction_pool.import(t),
});
}
}
}

fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) {
match import {
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND),
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC),
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC),
TransactionImport::None => {},
}
}

/// Propagate one extrinsic.
pub fn propagate_extrinsic(
&mut self,
Expand Down Expand Up @@ -1953,7 +2001,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
GenericMessage::BlockRequest(r),
)
}
}
Expand All @@ -1970,7 +2018,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
GenericMessage::BlockRequest(r),
)
}
}
Expand All @@ -1988,9 +2036,13 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::FinalityProofRequest(r))
GenericMessage::FinalityProofRequest(r),
)
}
}
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_extrinsic_import(peer_id, result);
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}
Expand Down
1 change: 0 additions & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,6 @@ ServiceBuilder<
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool.clone(),
client: client.clone(),
executor: task_manager.spawn_handle(),
});

let protocol_id = {
Expand Down
68 changes: 35 additions & 33 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use futures::{
sink::SinkExt,
task::{Spawn, FutureObj, SpawnError},
};
use sc_network::{NetworkService, network_state::NetworkState, PeerId, ReportHandle};
use sc_network::{NetworkService, network_state::NetworkState, PeerId};
use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
use sp_runtime::generic::BlockId;
Expand All @@ -76,7 +76,10 @@ pub use sc_executor::NativeExecutionDispatch;
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
#[doc(hidden)]
pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
pub use sc_network::config::{
FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder, TransactionImport,
TransactionImportFuture,
};
pub use sc_tracing::TracingReceiver;
pub use task_manager::SpawnTaskHandle;
use task_manager::TaskManager;
Expand Down Expand Up @@ -616,7 +619,6 @@ pub struct TransactionPoolAdapter<C, P> {
imports_external_transactions: bool,
pool: Arc<P>,
client: Arc<C>,
executor: SpawnTaskHandle,
}

/// Get transactions for propagation.
Expand Down Expand Up @@ -659,42 +661,42 @@ where

fn import(
&self,
report_handle: ReportHandle,
who: PeerId,
reputation_change_good: sc_network::ReputationChange,
reputation_change_bad: sc_network::ReputationChange,
transaction: B::Extrinsic
) {
transaction: B::Extrinsic,
) -> TransactionImportFuture {
if !self.imports_external_transactions {
debug!("Transaction rejected");
return;
Box::pin(futures::future::ready(TransactionImport::None));
}

let encoded = transaction.encode();
match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => {
let best_block_id = BlockId::hash(self.client.info().best_hash);
let source = sp_transaction_pool::TransactionSource::External;
let import_future = self.pool.submit_one(&best_block_id, source, uxt);
let import_future = import_future
.map(move |import_result| {
match import_result {
Ok(_) => report_handle.report_peer(who, reputation_change_good),
Err(e) => match e.into_pool_error() {
Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => (),
Ok(e) => {
report_handle.report_peer(who, reputation_change_bad);
debug!("Error adding transaction to the pool: {:?}", e)
}
Err(e) => debug!("Error converting pool error: {:?}", e),
}
}
});

self.executor.spawn("extrinsic-import", import_future);
let uxt = match Decode::decode(&mut &encoded[..]) {
Ok(uxt) => uxt,
Err(e) => {
debug!("Transaction invalid: {:?}", e);
return Box::pin(futures::future::ready(TransactionImport::Bad));
}
Err(e) => debug!("Error decoding transaction {}", e),
}
};

let best_block_id = BlockId::hash(self.client.info().best_hash);

let import_future = self.pool.submit_one(&best_block_id, sp_transaction_pool::TransactionSource::External, uxt);
Box::pin(async move {
match import_future.await {
Ok(_) => TransactionImport::NewGood,
Err(e) => match e.into_pool_error() {
Ok(sp_transaction_pool::error::Error::AlreadyImported(_)) => TransactionImport::KnownGood,
Ok(e) => {
debug!("Error adding transaction to the pool: {:?}", e);
TransactionImport::Bad
}
Err(e) => {
debug!("Error converting pool error: {:?}", e);
// it is not bad at least, just some internal node logic error, so peer is innocent.
TransactionImport::KnownGood
}
}
}
})
}

fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
Expand Down