Skip to content

Commit

Permalink
Merge branch 'development' into core-fix-0-conf-txns
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 4, 2022
2 parents 01ec159 + aeeefbb commit 41c02fa
Show file tree
Hide file tree
Showing 20 changed files with 176 additions and 266 deletions.
61 changes: 0 additions & 61 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ tracing-subscriber = "0.2.20"

# Metrics
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
warp = { version = "0.3.1", optional = true }
warp = { version = "0.3.1", optional = true, default-features = false }
reqwest = { version = "0.11.4", default-features = false, optional = true }

[features]
Expand Down
15 changes: 8 additions & 7 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl LMDBDatabase {
&self,
txn: &WriteTransaction<'_>,
height: u64,
header_hash: HashOutput,
header_hash: &HashOutput,
input: &TransactionInput,
mmr_position: u32,
) -> Result<(), ChainStorageError> {
Expand All @@ -657,7 +657,7 @@ impl LMDBDatabase {
txn,
&self.deleted_txo_mmr_position_to_height_index,
&mmr_position,
&(height, &header_hash),
&(height, header_hash),
"deleted_txo_mmr_position_to_height_index",
)?;

Expand Down Expand Up @@ -709,7 +709,7 @@ impl LMDBDatabase {
}

let hash = input.hash();
let key = InputKey::new(&header_hash, mmr_position, &hash);
let key = InputKey::new(header_hash, mmr_position, &hash);
lmdb_insert(
txn,
&*self.inputs_db,
Expand All @@ -718,7 +718,7 @@ impl LMDBDatabase {
input,
header_hash,
mmr_position,
hash,
hash: &hash,
},
"inputs_db",
)
Expand Down Expand Up @@ -1192,9 +1192,10 @@ impl LMDBDatabase {
let mut spent_zero_conf_commitments = Vec::new();
// unique_id_index expects inputs to be inserted before outputs
for input in &inputs {
let index = match self.fetch_mmr_leaf_index(&**txn, MmrTree::Utxo, &input.output_hash())? {
let output_hash = input.output_hash();
let index = match self.fetch_mmr_leaf_index(&**txn, MmrTree::Utxo, &output_hash)? {
Some(index) => index,
None => match output_mmr.find_leaf_index(&input.output_hash())? {
None => match output_mmr.find_leaf_index(&output_hash)? {
Some(index) => {
debug!(
target: LOG_TARGET,
Expand All @@ -1213,7 +1214,7 @@ impl LMDBDatabase {
)));
}
debug!(target: LOG_TARGET, "Inserting input `{}`", input.commitment.to_hex());
self.insert_input(txn, current_header_at_height.height, block_hash.clone(), input, index)?;
self.insert_input(txn, current_header_at_height.height, &block_hash, input, index)?;
}

for (output, mmr_count) in outputs {
Expand Down
8 changes: 6 additions & 2 deletions base_layer/core/src/chain_storage/lmdb_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ pub(crate) struct TransactionOutputRowData {
pub mined_height: u64,
}

/// Transaction input row data taking references and used for serialization.
/// This struct must mirror the fields in `TransactionInputRowData`
#[derive(Serialize, Debug)]
pub(crate) struct TransactionInputRowDataRef<'a> {
pub input: &'a TransactionInput,
pub header_hash: HashOutput,
#[allow(clippy::ptr_arg)]
pub header_hash: &'a HashOutput,
pub mmr_position: u32,
pub hash: HashOutput,
#[allow(clippy::ptr_arg)]
pub hash: &'a HashOutput,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use tari_common::configuration::Network;
pub const DEFAULT_DNS_NAME_SERVER: &str = "1.1.1.1:853/cloudflare-dns.com";

/// Major network version. Peers will refuse connections if this value differs
pub const MAJOR_NETWORK_VERSION: u32 = 0;
pub const MAJOR_NETWORK_VERSION: u8 = 0;
/// Minor network version. This should change with each time the network protocol has changed in a backward-compatible
/// way.
pub const MINOR_NETWORK_VERSION: u32 = 0;
pub const MINOR_NETWORK_VERSION: u8 = 0;
14 changes: 13 additions & 1 deletion comms/dht/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,11 @@ async fn dht_propagate_dedup() {

let mut node_A_messaging = node_A.messaging_events.subscribe();
let mut node_B_messaging = node_B.messaging_events.subscribe();
let mut node_B_messaging2 = node_B.messaging_events.subscribe();
let mut node_C_messaging = node_C.messaging_events.subscribe();
let mut node_C_messaging2 = node_C.messaging_events.subscribe();
let mut node_D_messaging = node_D.messaging_events.subscribe();
let mut node_D_messaging2 = node_D.messaging_events.subscribe();

#[derive(Clone, PartialEq, ::prost::Message)]
struct Person {
Expand Down Expand Up @@ -596,6 +599,11 @@ async fn dht_propagate_dedup() {
let node_C_id = node_C.node_identity().node_id().clone();
let node_D_id = node_D.node_identity().node_id().clone();

// Ensure that the message has propagated before disconnecting everyone
let _ = node_B_messaging2.recv().await.unwrap();
let _ = node_C_messaging2.recv().await.unwrap();
let _ = node_D_messaging2.recv().await.unwrap();

node_A.shutdown().await;
node_B.shutdown().await;
node_C.shutdown().await;
Expand All @@ -611,7 +619,11 @@ async fn dht_propagate_dedup() {
let received = filter_received(collect_try_recv!(node_B_messaging, timeout = Duration::from_secs(20)));
let recv_count = count_messages_received(&received, &[&node_A_id, &node_C_id]);
// Expected race condition: If A->B->C before A->C then C->B does not happen
assert!((1..=2).contains(&recv_count));
assert!(
(1..=2).contains(&recv_count),
"expected recv_count to be in [1-2] but was {}",
recv_count
);

let received = filter_received(collect_try_recv!(node_C_messaging, timeout = Duration::from_secs(20)));
let recv_count = count_messages_received(&received, &[&node_A_id, &node_B_id]);
Expand Down
2 changes: 1 addition & 1 deletion comms/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl CommsBuilder {
}

/// Set a network major and minor version as per [RFC-173 Versioning](https://rfc.tari.com/RFC-0173_Versioning.html)
pub fn with_node_version(mut self, major_version: u32, minor_version: u32) -> Self {
pub fn with_node_version(mut self, major_version: u8, minor_version: u8) -> Self {
self.connection_manager_config.network_info.major_version = major_version;
self.connection_manager_config.network_info.minor_version = minor_version;
self
Expand Down
25 changes: 9 additions & 16 deletions comms/src/connection_manager/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@

use std::{convert::TryFrom, net::Ipv6Addr};

use futures::StreamExt;
use log::*;
use tokio::io::{AsyncRead, AsyncWrite};

use super::types::ConnectionDirection;
use crate::{
connection_manager::error::ConnectionManagerError,
multiaddr::{Multiaddr, Protocol},
multiplexing::Yamux,
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags},
proto::identity::PeerIdentityMsg,
protocol,
Expand All @@ -43,30 +42,24 @@ const LOG_TARGET: &str = "comms::connection_manager::common";
/// The maximum size of the peer's user agent string. If the peer sends a longer string it is truncated.
const MAX_USER_AGENT_LEN: usize = 100;

pub async fn perform_identity_exchange<'p, P: IntoIterator<Item = &'p ProtocolId>>(
muxer: &mut Yamux,
pub async fn perform_identity_exchange<
'p,
P: IntoIterator<Item = &'p ProtocolId>,
TSocket: AsyncRead + AsyncWrite + Unpin,
>(
socket: &mut TSocket,
node_identity: &NodeIdentity,
direction: ConnectionDirection,
our_supported_protocols: P,
network_info: NodeNetworkInfo,
) -> Result<PeerIdentityMsg, ConnectionManagerError> {
let mut control = muxer.get_yamux_control();
let stream = match direction {
ConnectionDirection::Inbound => muxer
.incoming_mut()
.next()
.await
.ok_or(ConnectionManagerError::IncomingListenerStreamClosed)?,
ConnectionDirection::Outbound => control.open_stream().await?,
};

debug!(
target: LOG_TARGET,
"{} substream opened to peer. Performing identity exchange.", direction
"{} socket opened to peer. Performing identity exchange.", direction
);

let peer_identity =
protocol::identity_exchange(node_identity, direction, our_supported_protocols, network_info, stream).await?;
protocol::identity_exchange(node_identity, direction, our_supported_protocols, network_info, socket).await?;

Ok(peer_identity)
}
Expand Down
30 changes: 15 additions & 15 deletions comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ where
async fn perform_socket_upgrade_procedure(
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
socket: NoiseSocket<TTransport::Output>,
mut socket: NoiseSocket<TTransport::Output>,
dialed_addr: Multiaddr,
authenticated_public_key: CommsPublicKey,
conn_man_notifier: mpsc::Sender<ConnectionManagerEvent>,
Expand All @@ -361,43 +361,36 @@ where
cancel_signal: ShutdownSignal,
) -> Result<PeerConnection, ConnectionManagerError> {
static CONNECTION_DIRECTION: ConnectionDirection = ConnectionDirection::Outbound;
let mut muxer = Yamux::upgrade_connection(socket, CONNECTION_DIRECTION)
.await
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

debug!(
target: LOG_TARGET,
"Starting peer identity exchange for peer with public key '{}'", authenticated_public_key
);
if cancel_signal.is_terminated() {
return Err(ConnectionManagerError::DialCancelled);
}

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let peer_identity = common::perform_identity_exchange(
&mut muxer,
&mut socket,
&node_identity,
CONNECTION_DIRECTION,
&our_supported_protocols,
config.network_info.clone(),
)
.await?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

let features = PeerFeatures::from_bits_truncate(peer_identity.features);
trace!(
debug!(
target: LOG_TARGET,
"Peer identity exchange succeeded on Outbound connection for peer '{}' (Features = {:?})",
authenticated_public_key,
features
);
trace!(target: LOG_TARGET, "{:?}", peer_identity);

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let (peer_node_id, their_supported_protocols) = common::validate_and_add_peer_from_peer_identity(
&peer_manager,
known_peer,
Expand All @@ -409,7 +402,6 @@ where
.await?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

Expand All @@ -420,6 +412,14 @@ where
peer_node_id.short_str()
);

let muxer = Yamux::upgrade_connection(socket, CONNECTION_DIRECTION)
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

peer_connection::create(
muxer,
dialed_addr,
Expand Down
Loading

0 comments on commit 41c02fa

Please sign in to comment.