Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: be more permissive of responses for the incorrect request_id #3588

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 11 additions & 33 deletions RFC/src/RFC-0250_Covenants.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,105 +206,83 @@ little-endian 64-byte unsigned integer.

The output set is returned unaltered. This rule is implicit for an empty (0 byte) covenant.

```yaml
op_byte: 0x20
op_byte: 0x20<br>
args: []
```

##### and(A, B)

The intersection (\\(A \cap B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).

```yaml
op_byte: 0x21
op_byte: 0x21<br>
args: [Covenant, Covenant]
```

##### or(A, B)

The union (\\(A \cup B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).

```yaml
op_byte: 0x22
op_byte: 0x22<br>
args: [Covenant, Covenant]
```

##### xor(A, B)

The symmetric difference (\\(A \triangle B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).
This is, outputs that match either \\(A\\) or \\(B\\) but not both.

```yaml
op_byte: 0x23
op_byte: 0x23<br>
args: [Covenant, Covenant]
```

##### not(A)

Returns the compliment of `A`. That is, all the elements of `A` are removed from the
resultant output set.

```yaml
op_byte: 0x24
op_byte: 0x24<br>
args: [Covenant]
```

##### empty()

Returns an empty set. This will always fail and, if used alone, prevents the UTXO from ever being spent.
A more useful reason to use `empty` is in conjunction a conditional e.g. `if_else(Condition(older_rel(10)), A, empty)`

```yaml
op_byte: 0x25
op_byte: 0x25<br>
args: []
```

#### Filters

##### filter_output_hash_eq(hash)

Filters for a single output that matches the hash. This filter only returns zero or one outputs.

```yaml
op_byte: 0x30
op_byte: 0x30<br>
args: [Hash]
```

##### filter_fields_preserved(fields)

Filter for outputs where all given fields in the input are preserved in the output.

```yaml
op_byte: 0x31
op_byte: 0x31<br>
args: [Fields]
```

##### filter_field_int_eq(field, int)

Filters for outputs whose field value matches the given integer value. If the given field cannot be cast
to an unsigned 64-bit integer, the transaction/block is rejected.

```yaml
op_byte: 0x32
op_byte: 0x32<br>
args: [Field, VarInt]
```

##### filter_fields_hashed_eq(fields, hash)

```yaml
op_byte: 0x33
op_byte: 0x33<br>
args: [Fields, VarInt]
```

##### filter_relative_height(height)

Checks the block height that current [UTXO] (i.e. the current input) was mined plus `height` is greater than or
equal to the current block height. If so, the `identity()` is returned, otherwise `empty()`.

```yaml
op_byte: 0x34
op_byte: 0x34<br>
args: [VarInt]
```

#### Encoding / Decoding

Expand Down
6 changes: 5 additions & 1 deletion applications/daily_tests/washing_machine.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ function WashingMachine(options) {
`🚀 Launching washing machine (numTransactions = ${numTransactions}, numRounds = ${numRounds}, sleep = ${sleepAfterRound}s)`
);

debug(`Connecting to wallet1 at ${wallet1Grpc}...`);
await this.wallet1.connect(wallet1Grpc);
debug(`Connected.`);

debug("Compiling and starting applications...");
let wallet2Process = null;
// Start wallet2
if (wallet2Grpc) {
this.wallet2 = new WalletClient();
debug(`Connecting to wallet2 at ${wallet2Grpc}...`);
await this.wallet2.connect(wallet2Grpc);
} else {
debug("Compiling wallet2...");
const port = await getFreePort(20000, 25000);
wallet2Process = createGrpcWallet(
baseNodeSeed,
Expand All @@ -148,6 +151,7 @@ function WashingMachine(options) {
true
);
wallet2Process.baseDir = "./wallet";
debug("Starting wallet2...");
await wallet2Process.startNew();
this.wallet2 = await wallet2Process.connectClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,14 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result<Vec<ParsedArgument>,
parsed_args.push(ParsedArgument::PublicKey(pubkey));

// transaction type
let txn_type = args
.next()
.ok_or_else(|| ParseError::Empty("transaction type".to_string()))?;
let txn_type = args.next();
let negotiated = match txn_type {
"negotiated" => true,
"one_sided" => false,
Some("negotiated") | Some("interactive") => true,
Some("one_sided") | Some("one-sided") | Some("onesided") => false,
_ => {
println!("Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'\n");
println!("Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'\n");
return Err(ParseError::Invalid(
"Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'".to_string(),
"Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'".to_string(),
));
},
};
Expand Down Expand Up @@ -531,7 +529,7 @@ mod test {
Err(e) => match e {
ParseError::Invalid(e) => assert_eq!(
e,
"Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'".to_string()
"Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'".to_string()
),
_ => panic!("Expected parsing <transaction type> to return an error here"),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

let resp = match client.find_chain_split(request).await {
Ok(r) => r,
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
// This round we sent less hashes than the max, so the next round will not have any more hashes to
// send. Exit early in this case.
if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod sync_blocks {
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_blocks(req).await.unwrap_err();
unpack_enum!(RpcStatusCode::NotFound = err.status_code());
unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/mempool/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ mod get_tx_state_by_excess_sig {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}

Expand Down Expand Up @@ -174,6 +174,6 @@ mod submit_transaction {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
use log::*;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tari_common_types::types::BlockHash;
use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
use tari_comms::protocol::rpc::RpcError::RequestFailed;
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
Expand Down Expand Up @@ -353,7 +353,7 @@ where
info!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
if status.status_code() == NotFound {
if status.as_status_code().is_not_found() {
return Ok(None);
} else {
return Err(rpc_error.into());
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/transaction_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Default for TransactionServiceConfig {
direct_send_timeout: Duration::from_secs(20),
broadcast_send_timeout: Duration::from_secs(60),
low_power_polling_timeout: Duration::from_secs(300),
transaction_resend_period: Duration::from_secs(3600),
transaction_resend_period: Duration::from_secs(600),
resend_response_cooldown: Duration::from_secs(300),
pending_transaction_cancellation_timeout: Duration::from_secs(259200), // 3 Days
num_confirmations_required: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ where
warn!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
if status.status_code() == NotFound {
if status.as_status_code() == NotFound {
return Ok(None);
} else {
return Err(rpc_error.into());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ where TBackend: WalletBackend + 'static
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
match client.find_chain_split(request).await {
Ok(_) => Ok(metadata.utxo_index + 1),
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
warn!(target: LOG_TARGET, "Reorg detected: {}", err);
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode]
total += t;
avg += a;
println!(
"{} total connections on the network. ({} per node on average)",
"{} total connections on the network. ({} per peer on average)",
total,
avg / (wallets.len() + nodes.len())
);
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod get_closer_peers {
let node_id = NodeId::default();
let req = mock.request_with_context(node_id, req);
let err = service.get_closer_peers(req).await.unwrap_err();
assert_eq!(err.status_code(), RpcStatusCode::BadRequest);
assert_eq!(err.as_status_code(), RpcStatusCode::BadRequest);
}
}

Expand Down
2 changes: 1 addition & 1 deletion comms/rpc_macros/tests/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn it_returns_an_error_for_invalid_method_nums() {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::UnsupportedMethod = err.status_code());
unpack_enum!(RpcStatusCode::UnsupportedMethod = err.as_status_code());
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
common,
dial_state::DialState,
manager::{ConnectionManagerConfig, ConnectionManagerEvent},
metrics,
peer_connection,
},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -193,6 +194,7 @@ where
dial_result: Result<PeerConnection, ConnectionManagerError>,
) {
let node_id = dial_state.peer().node_id.clone();
metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).inc();

let removed = self.cancel_signals.remove(&node_id);
drop(removed);
Expand All @@ -213,6 +215,8 @@ where
},
}

metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).dec();

if self.pending_dial_requests.contains_key(&node_id) {
self.reply_to_pending_requests(&node_id, dial_result.clone());
}
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
bounded_executor::BoundedExecutor,
connection_manager::{
liveness::LivenessSession,
metrics,
wire_mode::{WireMode, LIVENESS_WIRE_MODE},
},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -239,6 +240,7 @@ where

let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",);
let inbound_fut = async move {
metrics::pending_connections(None, ConnectionDirection::Inbound).inc();
match Self::read_wire_format(&mut socket, config.time_to_first_byte).await {
Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => {
let this_node_id_str = node_identity.node_id().short_str();
Expand Down Expand Up @@ -325,6 +327,8 @@ where
);
},
}

metrics::pending_connections(None, ConnectionDirection::Inbound).dec();
}
.instrument(span);

Expand Down
20 changes: 17 additions & 3 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{metrics, ConnectionDirection},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity},
Expand Down Expand Up @@ -397,10 +398,14 @@ where
node_id.short_str(),
proto_str
);
metrics::inbound_substream_counter(&node_id, &protocol).inc();
let notify_fut = self
.protocols
.notify(&protocol, ProtocolEvent::NewInboundSubstream(node_id, stream));
match time::timeout(Duration::from_secs(10), notify_fut).await {
Ok(Ok(_)) => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
Expand All @@ -413,12 +418,21 @@ where
"Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err
);
},
_ => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
}
},

PeerConnected(conn) => {
metrics::successful_connections(conn.peer_node_id(), conn.direction()).inc();
self.publish_event(PeerConnected(conn));
},
PeerConnectFailed(peer, err) => {
metrics::failed_connections(&peer, ConnectionDirection::Outbound).inc();
self.publish_event(PeerConnectFailed(peer, err));
},
PeerInboundConnectFailed(err) => {
metrics::failed_connections(&Default::default(), ConnectionDirection::Inbound).inc();
self.publish_event(PeerInboundConnectFailed(err));
},
event => {
self.publish_event(event);
},
Expand Down
Loading