Skip to content

Commit

Permalink
feat: determine leader from payload (#221)
Browse files Browse the repository at this point in the history
Description
---
* Add a leader strategy that is determined by the payload_id and shard_id. 
* Allow the caller of submit-transaction to specify a custom timeout to wait
* Add client side timing stats to transaction summary
  • Loading branch information
stringhandler authored Nov 25, 2022
1 parent 1325d9a commit ab3c7fa
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 6 deletions.
8 changes: 7 additions & 1 deletion applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ impl JsonRpcHandlers {
.map_err(internal_error(answer_id))?;

if transaction.wait_for_result {
return wait_for_transaction_result(answer_id, hash, subscription, Duration::from_secs(30)).await;
return wait_for_transaction_result(
answer_id,
hash,
subscription,
Duration::from_secs(transaction.wait_for_result_timeout.unwrap_or(30)),
)
.await;
}

Ok(JsonRpcResponse::success(answer_id, SubmitTransactionResponse {
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/json_rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn handler(
Extension(handlers): Extension<Arc<JsonRpcHandlers>>,
ContentLengthLimit(value): ContentLengthLimit<JsonRpcExtractor, JSON_SIZE_LIMIT_BYTES>,
) -> JrpcResult {
info!(target: LOG_TARGET, "🌐 JSON-RPC request: {}", value.method);
debug!(target: LOG_TARGET, "🌐 JSON-RPC request: {}", value.method);
match value.method.as_str() {
// Transaction
// "get_transaction_status" => handlers.get_transaction_status(value).await,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_dan_core::{
services::{
epoch_manager::EpochManager,
infrastructure_services::OutboundService,
leader_strategy::AlwaysFirstLeader,
leader_strategy::PayloadSpecificLeaderStrategy,
},
workers::{
events::{EventSubscription, HotStuffEvent},
Expand Down Expand Up @@ -96,7 +96,7 @@ impl HotstuffService {
let (tx_vote_message, rx_vote_message) = channel(100);
let (tx_events, _) = broadcast::channel(100);

let leader_strategy = AlwaysFirstLeader {};
let leader_strategy = PayloadSpecificLeaderStrategy {};

let consensus_constants = ConsensusConstants::devnet();

Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/tests/utils/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub async fn send_template_transaction(
fee: transaction.fee(),
sender_public_key: transaction.sender_public_key().clone(),
wait_for_result: true,
wait_for_result_timeout: None,
inputs: vec![],
num_outputs: 0,
};
Expand Down
11 changes: 9 additions & 2 deletions applications/tari_validator_node_cli/src/command/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::{
path::{Path, PathBuf},
str::FromStr,
time::{Duration, Instant},
};

use clap::{Args, Subcommand};
Expand Down Expand Up @@ -72,6 +73,9 @@ pub struct SubmitArgs {
pub struct CommonSubmitArgs {
#[clap(long, short = 'w')]
wait_for_result: bool,
/// Timeout in seconds
#[clap(long, short = 't')]
wait_for_result_timeout: Option<u64>,
#[clap(long, short = 'n')]
num_outputs: Option<u8>,
#[clap(long, short = 'i')]
Expand Down Expand Up @@ -216,13 +220,15 @@ async fn submit_transaction(
inputs: input_data,
num_outputs: common.num_outputs.unwrap_or(0),
wait_for_result: common.wait_for_result,
wait_for_result_timeout: common.wait_for_result_timeout,
};

if request.inputs.is_empty() && request.num_outputs == 0 {
println!("No inputs or outputs. This transaction will not be processed by the network.");
return Ok(());
}
println!("✅ Transaction {} submitted.", tx_hash);
let timer = Instant::now();
if common.wait_for_result {
println!("⏳️ Waiting for transaction result...");
println!();
Expand All @@ -234,13 +240,13 @@ async fn submit_transaction(
if let Some(diff) = result.finalize.result.accept() {
component_manager.commit_diff(diff)?;
}
summarize(&result);
summarize(&result, timer.elapsed());
}
Ok(())
}

#[allow(clippy::too_many_lines)]
fn summarize(result: &TransactionFinalizeResult) {
fn summarize(result: &TransactionFinalizeResult, time_taken: Duration) {
println!("✅️ Transaction finalized",);
println!();
println!("Epoch: {}", result.qc.epoch());
Expand Down Expand Up @@ -349,6 +355,7 @@ fn summarize(result: &TransactionFinalizeResult) {
for log in &result.finalize.logs {
println!("{}", log);
}
println!("Time taken: {:?}", time_taken);
println!();
println!("OVERALL DECISION: {:?}", result.decision);
}
Expand Down
2 changes: 2 additions & 0 deletions clients/validator_node_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ pub struct SubmitTransactionRequest {
/// Set to true to wait for the transaction to complete before returning
#[serde(default)]
pub wait_for_result: bool,
#[serde(default)]
pub wait_for_result_timeout: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
17 changes: 17 additions & 0 deletions dan_layer/core/src/services/leader_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use digest::Digest;
use tari_crypto::hash::blake2::Blake256;
use tari_dan_common_types::{PayloadId, ShardId};

use crate::{models::Committee, services::infrastructure_services::NodeAddressable};
Expand Down Expand Up @@ -67,3 +69,18 @@ impl<TAddr: NodeAddressable> LeaderStrategy<TAddr> for AlwaysFirstLeader {
0
}
}

pub struct PayloadSpecificLeaderStrategy {}
impl<TAddr: NodeAddressable> LeaderStrategy<TAddr> for PayloadSpecificLeaderStrategy {
fn calculate_leader(&self, committee: &Committee<TAddr>, payload: PayloadId, shard: ShardId, round: u32) -> u32 {
// Perhaps a less heavy hasher in future?
let hash: Vec<u8> = Blake256::new()
.chain(payload.as_bytes())
.chain(shard.as_bytes())
.finalize()
.to_vec();
let hash = u32::from_le_bytes([hash[0], hash[1], hash[2], hash[3]]);
let first = (hash % committee.members.len() as u32) as u32;
(first + round) % committee.members.len() as u32
}
}

0 comments on commit ab3c7fa

Please sign in to comment.