Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
telezhnaya committed Oct 13, 2023
1 parent 7e28c38 commit 3ee947f
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 94 deletions.
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ impl ViewClientActor {
.chain
.get_block_header(&execution_outcome.transaction_outcome.block_hash)?])
{
Ok(_) => TxExecutionStatus::InclusionFinal,
Err(_) => TxExecutionStatus::Inclusion,
Ok(_) => TxExecutionStatus::IncludedFinal,
Err(_) => TxExecutionStatus::Included,
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions chain/jsonrpc-primitives/src/types/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use serde_json::Value;
#[derive(Debug, Clone)]
pub struct RpcBroadcastTransactionRequest {
pub signed_transaction: near_primitives::transaction::SignedTransaction,
pub finality: near_primitives::views::TxExecutionStatus,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct RpcTransactionStatusCommonRequest {
pub struct RpcTransactionStatusRequest {
#[serde(flatten)]
pub transaction_info: TransactionInfo,
#[serde(default)]
pub finality: near_primitives::views::TxExecutionStatus,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -56,21 +59,18 @@ pub struct RpcBroadcastTxSyncResponse {
pub transaction_hash: near_primitives::hash::CryptoHash,
}

impl From<TransactionInfo> for RpcTransactionStatusCommonRequest {
fn from(transaction_info: TransactionInfo) -> Self {
Self { transaction_info }
}
}

impl From<near_primitives::transaction::SignedTransaction> for RpcTransactionStatusCommonRequest {
impl From<near_primitives::transaction::SignedTransaction> for TransactionInfo {
fn from(transaction_info: near_primitives::transaction::SignedTransaction) -> Self {
Self { transaction_info: transaction_info.into() }
Self::Transaction(transaction_info)
}
}

impl From<near_primitives::transaction::SignedTransaction> for TransactionInfo {
fn from(transaction_info: near_primitives::transaction::SignedTransaction) -> Self {
Self::Transaction(transaction_info)
impl From<near_primitives::views::TxStatusView> for RpcTransactionResponse {
fn from(view: near_primitives::views::TxStatusView) -> Self {
Self {
final_execution_outcome: view.execution_outcome,
final_execution_status: view.status,
}
}
}

Expand Down
79 changes: 67 additions & 12 deletions chain/jsonrpc/src/api/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,53 @@ use serde_json::Value;
use near_client_primitives::types::TxStatusError;
use near_jsonrpc_primitives::errors::RpcParseError;
use near_jsonrpc_primitives::types::transactions::{
RpcBroadcastTransactionRequest, RpcTransactionError, RpcTransactionStatusCommonRequest,
RpcBroadcastTransactionRequest, RpcTransactionError, RpcTransactionStatusRequest,
TransactionInfo,
};
use near_primitives::borsh::BorshDeserialize;
use near_primitives::transaction::SignedTransaction;
use near_primitives::views::TxExecutionStatus;

use super::{Params, RpcFrom, RpcRequest};

impl RpcRequest for RpcBroadcastTransactionRequest {
fn parse(value: Value) -> Result<Self, RpcParseError> {
let signed_transaction =
Params::new(value).try_singleton(|value| decode_signed_transaction(value)).unwrap()?;
Ok(Self { signed_transaction })
let tx_request = Params::new(value)
.try_singleton(|value| {
Ok(RpcBroadcastTransactionRequest {
signed_transaction: decode_signed_transaction(value)?,
finality: Default::default(),
})
})
.try_pair(|encoded_tx, finality: String| {
let finality = format!("\"{}\"", finality);
Ok(RpcBroadcastTransactionRequest {
signed_transaction: decode_signed_transaction(encoded_tx)?,
finality: serde_json::from_str::<TxExecutionStatus>(&finality).map_err(
|err| RpcParseError(format!("Failed to decode finality: {}", err)),
)?,
})
})
.unwrap()?;
Ok(tx_request)
}
}

impl RpcRequest for RpcTransactionStatusCommonRequest {
impl RpcRequest for RpcTransactionStatusRequest {
fn parse(value: Value) -> Result<Self, RpcParseError> {
Ok(Params::new(value)
.try_singleton(|signed_tx| decode_signed_transaction(signed_tx).map(|x| x.into()))
.try_singleton(|signed_tx| {
Ok(RpcTransactionStatusRequest {
transaction_info: decode_signed_transaction(signed_tx)?.into(),
finality: Default::default(),
})
})
.try_pair(|tx_hash, sender_account_id| {
Ok(TransactionInfo::TransactionId { tx_hash, sender_account_id }.into())
Ok(RpcTransactionStatusRequest {
transaction_info: TransactionInfo::TransactionId { tx_hash, sender_account_id }
.into(),
finality: Default::default(),
})
})
.unwrap_or_parse()?)
}
Expand Down Expand Up @@ -62,7 +87,7 @@ fn decode_signed_transaction(value: String) -> Result<SignedTransaction, RpcPars
mod tests {
use crate::api::RpcRequest;
use near_jsonrpc_primitives::types::transactions::{
RpcBroadcastTransactionRequest, RpcTransactionStatusCommonRequest,
RpcBroadcastTransactionRequest, RpcTransactionStatusRequest,
};
use near_primitives::borsh;
use near_primitives::hash::CryptoHash;
Expand All @@ -74,15 +99,24 @@ mod tests {
let tx_hash = CryptoHash::new().to_string();
let account_id = "sender.testnet";
let params = serde_json::json!([tx_hash, account_id]);
assert!(RpcTransactionStatusCommonRequest::parse(params).is_ok());
assert!(RpcTransactionStatusRequest::parse(params).is_ok());
}

#[test]
fn test_serialize_tx_status_params_as_object() {
let tx_hash = CryptoHash::new().to_string();
let account_id = "sender.testnet";
let params = serde_json::json!({"tx_hash": tx_hash, "sender_account_id": account_id});
assert!(RpcTransactionStatusCommonRequest::parse(params).is_ok());
assert!(RpcTransactionStatusRequest::parse(params).is_ok());
}

#[test]
fn test_serialize_tx_status_params_as_object_with_finality() {
let tx_hash = CryptoHash::new().to_string();
let account_id = "sender.testnet";
let finality = "INCLUDED";
let params = serde_json::json!({"tx_hash": tx_hash, "sender_account_id": account_id, "finality": finality});
assert!(RpcTransactionStatusRequest::parse(params).is_ok());
}

#[test]
Expand All @@ -92,15 +126,25 @@ mod tests {
let bytes_tx = borsh::to_vec(&tx).unwrap();
let str_tx = to_base64(&bytes_tx);
let params = serde_json::json!([str_tx]);
assert!(RpcTransactionStatusCommonRequest::parse(params).is_ok());
assert!(RpcTransactionStatusRequest::parse(params).is_ok());
}

// The params are invalid because sender_account_id is missing
#[test]
fn test_serialize_invalid_tx_status_params() {
let tx_hash = CryptoHash::new().to_string();
let params = serde_json::json!([tx_hash]);
assert!(RpcTransactionStatusCommonRequest::parse(params).is_err());
assert!(RpcTransactionStatusRequest::parse(params).is_err());
}

// The params are invalid because finality is supported only in tx status params passed by object
#[test]
fn test_serialize_tx_status_too_many_params() {
let tx_hash = CryptoHash::new().to_string();
let account_id = "sender.testnet";
let finality = "EXECUTED";
let params = serde_json::json!([tx_hash, account_id, finality]);
assert!(RpcTransactionStatusRequest::parse(params).is_err());
}

#[test]
Expand All @@ -112,4 +156,15 @@ mod tests {
let params = serde_json::json!([str_tx]);
assert!(RpcBroadcastTransactionRequest::parse(params).is_ok());
}

#[test]
fn test_serialize_send_tx_params_as_binary_signed_tx_with_finality() {
let tx_hash = CryptoHash::new();
let tx = SignedTransaction::empty(tx_hash);
let bytes_tx = borsh::to_vec(&tx).unwrap();
let str_tx = to_base64(&bytes_tx);
let finality = "EXECUTED";
let params = serde_json::json!([str_tx, finality]);
assert!(RpcBroadcastTransactionRequest::parse(params).is_ok());
}
}
75 changes: 19 additions & 56 deletions chain/jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl JsonRpcHandler {
Ok(match request.method.as_ref() {
// Handlers ordered alphabetically
"block" => process_method_call(request, |params| self.block(params)).await,
// todo add broadcast_tx
"broadcast_tx_async" => {
process_method_call(request, |params| self.send_tx_async(params)).await
}
Expand All @@ -332,6 +333,7 @@ impl JsonRpcHandler {
}
"network_info" => process_method_call(request, |_params: ()| self.network_info()).await,
"status" => process_method_call(request, |_params: ()| self.status()).await,
// todo recheck which finality is used for tx and exp tx status
"tx" => {
process_method_call(request, |params| self.tx_status_common(params, false)).await
}
Expand Down Expand Up @@ -530,9 +532,13 @@ impl JsonRpcHandler {
})?
}

/// Return status of the given transaction
///
/// `finality` forces the execution to wait until the desired finality level is reached
async fn tx_status_fetch(
&self,
tx_info: near_jsonrpc_primitives::types::transactions::TransactionInfo,
finality: near_primitives::views::TxExecutionStatus,
fetch_receipt: bool,
) -> Result<
near_jsonrpc_primitives::types::transactions::RpcTransactionResponse,
Expand All @@ -557,11 +563,8 @@ impl JsonRpcHandler {
.await;
match tx_status_result {
Ok(result) => {
if let Some(outcome) = result.execution_outcome {
break Ok(RpcTransactionResponse {
final_execution_outcome: Some(outcome),
final_execution_status: result.status,
})
if result.status >= finality {
break Ok(result.into())
}
// else: No such transaction recorded on chain yet
},
Expand Down Expand Up @@ -598,42 +601,6 @@ impl JsonRpcHandler {
})?
}

async fn tx_polling(
&self,
tx_info: near_jsonrpc_primitives::types::transactions::TransactionInfo,
) -> Result<
near_jsonrpc_primitives::types::transactions::RpcTransactionResponse,
near_jsonrpc_primitives::types::transactions::RpcTransactionError,
> {
timeout(self.polling_config.polling_timeout, async {
loop {
match self.tx_status_fetch(tx_info.clone(), false).await {
Ok(tx_status) => {
break Ok(tx_status)
}
// If transaction is missing, keep polling.
Err(near_jsonrpc_primitives::types::transactions::RpcTransactionError::UnknownTransaction {
..
}) => {}
// If we hit any other error, we return to the user.
Err(err) => {
break Err(err.rpc_into());
}
}
sleep(self.polling_config.polling_interval).await;
}
})
.await
.map_err(|_| {
metrics::RPC_TIMEOUT_TOTAL.inc();
tracing::warn!(
target: "jsonrpc", "Timeout: tx_polling method. tx_info {:?}",
tx_info,
);
near_jsonrpc_primitives::types::transactions::RpcTransactionError::TimeoutError
})?
}

/// Send a transaction idempotently (subsequent send of the same transaction will not cause
/// any new side-effects and the result will be the same unless we garbage collected it
/// already).
Expand Down Expand Up @@ -703,21 +670,15 @@ impl JsonRpcHandler {
near_jsonrpc_primitives::types::transactions::RpcTransactionError,
> {
let tx = request_data.signed_transaction;
match self.tx_status_fetch(tx.clone().into(), false).await
{
Ok(outcome) => {
return Ok(outcome);
}
Err(err @ near_jsonrpc_primitives::types::transactions::RpcTransactionError::InvalidTransaction {
..
}) => {
return Err(err);
}
_ => {}
}
match self.send_tx(tx.clone(), false).await? {
ProcessTxResponse::ValidTx | ProcessTxResponse::RequestRouted => {
self.tx_polling(tx.into()).await
self.tx_status_fetch(
near_jsonrpc_primitives::types::transactions::TransactionInfo::Transaction(
tx.clone(),
),
TxExecutionStatus::Executed,
false,
).await
}
network_client_response=> {
Err(
Expand Down Expand Up @@ -871,13 +832,15 @@ impl JsonRpcHandler {

async fn tx_status_common(
&self,
request_data: near_jsonrpc_primitives::types::transactions::RpcTransactionStatusCommonRequest,
request_data: near_jsonrpc_primitives::types::transactions::RpcTransactionStatusRequest,
fetch_receipt: bool,
) -> Result<
near_jsonrpc_primitives::types::transactions::RpcTransactionResponse,
near_jsonrpc_primitives::types::transactions::RpcTransactionError,
> {
let tx_status = self.tx_status_fetch(request_data.transaction_info, fetch_receipt).await?;
let tx_status = self
.tx_status_fetch(request_data.transaction_info, request_data.finality, fetch_receipt)
.await?;
Ok(tx_status.rpc_into())
}

Expand Down
9 changes: 7 additions & 2 deletions core/primitives/src/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1672,16 +1672,21 @@ pub struct TxStatusView {
serde::Deserialize,
Clone,
Debug,
Default,
Eq,
PartialEq,
Ord,
PartialOrd,
)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum TxExecutionStatus {
/// Transaction is waiting to be included into the block
None,
/// Transaction is included into the block. The block may be not finalised yet
Inclusion,
Included,
/// Transaction is included into finalised block
InclusionFinal,
#[default]
IncludedFinal,
/// Transaction is included into finalised block +
/// All the transaction receipts finished their execution.
/// The corresponding blocks for each receipt may be not finalised yet
Expand Down

0 comments on commit 3ee947f

Please sign in to comment.