Skip to content

Commit

Permalink
fix: check SAF message inflight and check stored_at is in past (#3444)
Browse files Browse the repository at this point in the history
Description
---
- Keeps track of inflight SAF requests and only accepts responses for
  requests that are inflight
- Checks that `stored_at` is in the past
- Fixes #3412, #3410 

Motivation and Context
---
See #3412, #3410

How Has This Been Tested?
---
- New/existing unit/integration tests
- memorynet
- Manually
  • Loading branch information
sdbondi authored Oct 24, 2021
1 parent b33e8b5 commit fbf8eb8
Show file tree
Hide file tree
Showing 25 changed files with 564 additions and 170 deletions.
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use log::*;
use tari_app_utilities::{consts, identity_management, utilities::create_transport_type};
use tari_common::{configuration::bootstrap::ApplicationType, GlobalConfig};
use tari_comms::{peer_manager::Peer, protocol::rpc::RpcServer, NodeIdentity, UnspawnedCommsNode};
use tari_comms_dht::{DbConnectionUrl, Dht, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, Dht, DhtConfig};
use tari_core::{
base_node,
base_node::{
Expand Down Expand Up @@ -251,7 +251,10 @@ where B: BlockchainBackend + 'static
auto_join: true,
allow_test_addresses: self.config.allow_test_addresses,
flood_ban_max_msg_count: self.config.flood_ban_max_msg_count,
saf_msg_validity: self.config.saf_expiry_duration,
saf_config: SafConfig {
msg_validity: self.config.saf_expiry_duration,
..Default::default()
},
dedup_cache_capacity: self.config.dedup_cache_capacity,
..Default::default()
},
Expand Down
7 changes: 5 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tari_comms::{
types::CommsSecretKey,
NodeIdentity,
};
use tari_comms_dht::{DbConnectionUrl, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig};
use tari_core::transactions::CryptoFactories;
use tari_p2p::{
auto_update::AutoUpdateConfig,
Expand Down Expand Up @@ -337,7 +337,10 @@ pub async fn init_wallet(
auto_join: true,
allow_test_addresses: config.allow_test_addresses,
flood_ban_max_msg_count: config.flood_ban_max_msg_count,
saf_msg_validity: config.saf_expiry_duration,
saf_config: SafConfig {
msg_validity: config.saf_expiry_duration,
..Default::default()
},
dedup_cache_capacity: config.dedup_cache_capacity,
..Default::default()
},
Expand Down
7 changes: 5 additions & 2 deletions base_layer/wallet/tests/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tari_comms::{
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags},
types::{CommsPublicKey, CommsSecretKey},
};
use tari_comms_dht::DhtConfig;
use tari_comms_dht::{store_forward::SafConfig, DhtConfig};
use tari_core::transactions::{
tari_amount::{uT, MicroTari},
test_helpers::{create_unblinded_output, TestParams},
Expand Down Expand Up @@ -119,7 +119,10 @@ async fn create_wallet(
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
auto_join: true,
saf_auto_request: true,
saf_config: SafConfig {
auto_request: true,
..Default::default()
},
..Default::default()
},
allow_test_addresses: true,
Expand Down
7 changes: 5 additions & 2 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ use tari_comms::{
transports::MemoryTransport,
types::CommsSecretKey,
};
use tari_comms_dht::{DbConnectionUrl, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig};
use tari_core::transactions::{tari_amount::MicroTari, transaction::OutputFeatures, CryptoFactories};
use tari_p2p::{
transport::{TorConfig, TransportType, TransportType::Tor},
Expand Down Expand Up @@ -2591,7 +2591,10 @@ pub unsafe extern "C" fn comms_config_create(
discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs),
database_url: DbConnectionUrl::File(dht_database_path),
auto_join: true,
saf_msg_validity: Duration::from_secs(saf_message_duration_in_secs),
saf_config: SafConfig {
msg_validity: Duration::from_secs(saf_message_duration_in_secs),
..Default::default()
},
..Default::default()
},
// TODO: This should be set to false for non-test wallets. See the `allow_test_addresses` field
Expand Down
6 changes: 5 additions & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use tari_comms_dht::{
envelope::NodeDestination,
inbound::DecryptedDhtMessage,
outbound::OutboundEncryption,
store_forward::SafConfig,
Dht,
DhtConfig,
};
Expand Down Expand Up @@ -912,7 +913,10 @@ async fn setup_comms_dht(

let dht = Dht::builder()
.with_config(DhtConfig {
saf_auto_request,
saf_config: SafConfig {
auto_request: saf_auto_request,
..Default::default()
},
auto_join: false,
discovery_request_timeout: Duration::from_secs(15),
num_neighbouring_nodes,
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ impl DhtActor {
.map(|p| p.node_id)
.collect())
},
SelectedPeers(peers) => Ok(peers),
Broadcast(exclude) => {
let connections = connectivity
.select_connections(ConnectivitySelection::random_nodes(
Expand Down
2 changes: 2 additions & 0 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub enum BroadcastStrategy {
/// Send directly to destination if connected but otherwise send to all n nearest Communication Nodes
DirectOrClosestNodes(Box<BroadcastClosestRequest>),
Broadcast(Vec<NodeId>),
SelectedPeers(Vec<NodeId>),
/// Propagate to a set of closest neighbours and random peers
Propagate(NodeDestination, Vec<NodeId>),
}
Expand All @@ -77,6 +78,7 @@ impl fmt::Display for BroadcastStrategy {
Random(n, excluded) => write!(f, "Random({}, {} excluded)", n, excluded.len()),
Broadcast(excluded) => write!(f, "Broadcast({} excluded)", excluded.len()),
Propagate(destination, excluded) => write!(f, "Propagate({}, {} excluded)", destination, excluded.len(),),
SelectedPeers(peers) => write!(f, "SelectedPeers({} peer(s))", peers.len()),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DhtBuilder {
}

pub fn set_auto_store_and_forward_requests(&mut self, enabled: bool) -> &mut Self {
self.config.saf_auto_request = enabled;
self.config.saf_config.auto_request = enabled;
self
}

Expand Down Expand Up @@ -112,6 +112,7 @@ impl DhtBuilder {

pub fn with_num_neighbouring_nodes(&mut self, n: usize) -> &mut Self {
self.config.num_neighbouring_nodes = n;
self.config.saf_config.num_neighbouring_nodes = n;
self
}

Expand Down
50 changes: 14 additions & 36 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{network_discovery::NetworkDiscoveryConfig, storage::DbConnectionUrl, version::DhtProtocolVersion};
use crate::{
network_discovery::NetworkDiscoveryConfig,
storage::DbConnectionUrl,
store_forward::SafConfig,
version::DhtProtocolVersion,
};
use std::time::Duration;

#[derive(Debug, Clone)]
Expand All @@ -33,41 +38,18 @@ pub struct DhtConfig {
/// Default: 20
pub outbound_buffer_size: usize,
/// The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour
/// Default: [DEFAULT_NUM_NEIGHBOURING_NODES](self::DEFAULT_NUM_NEIGHBOURING_NODES)
/// Default: 8
pub num_neighbouring_nodes: usize,
/// Number of random peers to include
/// Default: [DEFAULT_NUM_RANDOM_NODES](self::DEFAULT_NUM_RANDOM_NODES)
/// Default: 4
pub num_random_nodes: usize,
/// Send to this many peers when using the broadcast strategy
/// Default: 8
pub broadcast_factor: usize,
/// Send to this many peers when using the propagate strategy
/// Default: 4
pub propagation_factor: usize,
/// The amount of seconds added to the current time (Utc) which will then be used to check if the message has
/// expired or not when processing the message
/// Default: 10800
pub saf_msg_validity: Duration,
/// The maximum number of messages that can be stored using the Store-and-forward middleware.
/// Default: 100,000
pub saf_msg_storage_capacity: usize,
/// A request to retrieve stored messages will be ignored if the requesting node is
/// not within one of this nodes _n_ closest nodes.
/// Default 8
pub saf_num_closest_nodes: usize,
/// The maximum number of messages to return from a store and forward retrieval request.
/// Default: 100
pub saf_max_returned_messages: usize,
/// The time-to-live duration used for storage of low priority messages by the Store-and-forward middleware.
/// Default: 6 hours
pub saf_low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 3 days
pub saf_high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500 KiB
pub saf_max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub saf_auto_request: bool,
pub saf_config: SafConfig,
/// The max capacity of the message hash cache
/// Default: 2,500
pub dedup_cache_capacity: usize,
Expand Down Expand Up @@ -127,7 +109,10 @@ impl DhtConfig {
pub fn default_local_test() -> Self {
Self {
database_url: DbConnectionUrl::Memory,
saf_auto_request: false,
saf_config: SafConfig {
auto_request: false,
..Default::default()
},
auto_join: false,
network_discovery: NetworkDiscoveryConfig {
// If a test requires the peer probe they should explicitly enable it
Expand All @@ -150,13 +135,7 @@ impl Default for DhtConfig {
propagation_factor: 4,
broadcast_factor: 8,
outbound_buffer_size: 20,
saf_num_closest_nodes: 10,
saf_max_returned_messages: 50,
saf_msg_storage_capacity: 100_000,
saf_low_priority_msg_storage_ttl: Duration::from_secs(6 * 60 * 60), // 6 hours
saf_high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
saf_auto_request: true,
saf_max_message_size: 512 * 1024,
saf_config: Default::default(),
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
dedup_allowed_message_occurrences: 1,
Expand All @@ -172,7 +151,6 @@ impl Default for DhtConfig {
flood_ban_max_msg_count: 10000,
flood_ban_timespan: Duration::from_secs(100),
offline_peer_cooldown: Duration::from_secs(2 * 60 * 60),
saf_msg_validity: Duration::from_secs(10800),
}
}
}
17 changes: 10 additions & 7 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Dht {
saf_response_signal_rx: mpsc::Receiver<()>,
) -> StoreAndForwardService {
StoreAndForwardService::new(
self.config.clone(),
self.config.saf_config.clone(),
conn,
self.peer_manager.clone(),
self.dht_requester(),
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Dht {
self.node_identity.node_id().short_str()
)))
.layer(store_forward::StoreLayer::new(
self.config.clone(),
self.config.saf_config.clone(),
Arc::clone(&self.peer_manager),
Arc::clone(&self.node_identity),
self.store_and_forward_requester(),
Expand All @@ -321,7 +321,7 @@ impl Dht {
self.node_identity.features().contains(PeerFeatures::DHT_STORE_FORWARD),
))
.layer(store_forward::MessageHandlerLayer::new(
self.config.clone(),
self.config.saf_config.clone(),
self.store_and_forward_requester(),
self.dht_requester(),
Arc::clone(&self.node_identity),
Expand Down Expand Up @@ -640,6 +640,12 @@ mod test {
.await
.unwrap();

// SAF messages need to be requested before any response is accepted
dht.store_and_forward_requester()
.request_saf_messages_from_peer(node_identity.node_id().clone())
.await
.unwrap();

let spy = service_spy();
let mut service = dht.inbound_middleware_layer().layer(spy.to_service());

Expand All @@ -652,10 +658,7 @@ mod test {
MessageTag::new(),
false,
);
dht_envelope.header.as_mut().map(|header| {
header.message_type = DhtMessageType::SafStoredMessages as i32;
header
});
dht_envelope.header.as_mut().unwrap().message_type = DhtMessageType::SafStoredMessages as i32;
let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into());

service.call(inbound_message).await.unwrap_err();
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(map_entry_replace)]
#![doc(html_root_url = "https://docs.rs/tower-filter/0.3.0-alpha.2")]
#![cfg_attr(not(debug_assertions), deny(unused_variables))]
#![cfg_attr(not(debug_assertions), deny(unused_imports))]
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl BroadcastLayer {
dht_requester,
dht_discovery_requester,
node_identity,
message_validity_window: chrono::Duration::from_std(config.saf_msg_validity)
message_validity_window: chrono::Duration::from_std(config.saf_config.msg_validity)
.expect("message_validity_window is too large"),
protocol_version: config.protocol_version,
}
Expand Down
6 changes: 6 additions & 0 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ impl SendMessageParams {
self
}

/// Set broadcast_strategy to SelectedPeers. Messages are queued for all selected peers.
pub fn selected_peers(&mut self, peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::SelectedPeers(peers);
self
}

/// Set broadcast_strategy to Neighbours. `excluded_peers` are excluded. Only Peers that have
/// `PeerFeatures::MESSAGE_PROPAGATION` are included.
pub fn broadcast(&mut self, excluded_peers: Vec<NodeId>) -> &mut Self {
Expand Down
73 changes: 73 additions & 0 deletions comms/dht/src/store_forward/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

#[derive(Debug, Clone)]
pub struct SafConfig {
/// The amount of seconds added to the current time (Utc) which will then be used to check if the message has
/// expired or not when processing the message
/// Default: 3 hours
pub msg_validity: Duration,
/// The maximum number of messages that can be stored using the Store-and-forward middleware.
/// Default: 100,000
pub msg_storage_capacity: usize,
/// A request to retrieve stored messages will be ignored if the requesting node is
/// not within one of this nodes _n_ closest nodes.
/// Default 8
pub num_closest_nodes: usize,
/// The maximum number of messages to return from a store and forward retrieval request.
/// Default: 100
pub max_returned_messages: usize,
/// The time-to-live duration used for storage of low priority messages by the Store-and-forward middleware.
/// Default: 6 hours
pub low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 3 days
pub high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500 KiB
pub max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub auto_request: bool,
/// The maximum allowed time between asking for a message and accepting a response
pub max_inflight_request_age: Duration,
/// The maximum number of peer nodes that a message must be closer than to get stored by SAF
/// Default: 8
pub num_neighbouring_nodes: usize,
}

impl Default for SafConfig {
fn default() -> Self {
Self {
msg_validity: Duration::from_secs(3 * 60 * 60), // 3 hours
num_closest_nodes: 10,
max_returned_messages: 50,
msg_storage_capacity: 100_000,
low_priority_msg_storage_ttl: Duration::from_secs(6 * 60 * 60), // 6 hours
high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
auto_request: true,
max_message_size: 512 * 1024,
max_inflight_request_age: Duration::from_secs(120),
num_neighbouring_nodes: 8,
}
}
}
Loading

0 comments on commit fbf8eb8

Please sign in to comment.