Skip to content

Commit

Permalink
Added TransactionPoll::Error
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaoticTempest committed Oct 31, 2022
1 parent 097a20c commit da16342
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
30 changes: 20 additions & 10 deletions workspaces/src/operations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! All operation types that are generated/used when making transactions or view calls.

use crate::error::ErrorKind;
use crate::error::{Error, ErrorKind, RpcErrorCode};
use crate::result::{Execution, ExecutionFinalResult, Result, ViewResultDetails};
use crate::rpc::client::{
send_batch_tx_and_retry, send_batch_tx_async_and_retry, Client, DEFAULT_CALL_DEPOSIT,
Expand All @@ -14,6 +14,8 @@ use crate::worker::Worker;
use crate::{Account, CryptoHash, Network};

use near_account_id::ParseAccountError;
use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
use near_jsonrpc_client::methods::tx::RpcTransactionError;
use near_primitives::transaction::{
Action, AddKeyAction, CreateAccountAction, DeleteAccountAction, DeleteKeyAction,
DeployContractAction, FunctionCallAction, StakeAction, TransferAction,
Expand Down Expand Up @@ -456,7 +458,7 @@ impl<'a> TransactionStatus<'a> {
}
}

/// Checks the status of the transaction. If an `Err` is returned, then the
/// Checks the status of the transaction. If an [`TransactionPoll::Pending`] is returned, then the
/// transaction has not completed yet.
pub async fn status(&self) -> TransactionPoll<ExecutionFinalResult> {
let result = self
Expand All @@ -469,16 +471,23 @@ impl<'a> TransactionStatus<'a> {
.map(ExecutionFinalResult::from_view);

match result {
Ok(result) => TransactionPoll::Complete(result),
Err(err) => TransactionPoll::Pending(err.to_string()),
Ok(result) => TransactionPoll::Ready(result),
Err(err) => match err {
JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
RpcTransactionError::UnknownTransaction { .. },
)) => TransactionPoll::Pending,
other => TransactionPoll::Error(RpcErrorCode::BroadcastTxFailure.custom(other)),
},
}
}

/// Wait until the completion of the transaction by polling [`TransactionStatus::status`].
pub(crate) async fn wait(self) -> ExecutionFinalResult {
pub(crate) async fn wait(self) -> Result<ExecutionFinalResult> {
loop {
if let TransactionPoll::Complete(val) = self.status().await {
break val;
match self.status().await {
TransactionPoll::Ready(val) => break Ok(val),
TransactionPoll::Error(err) => break Err(err),
TransactionPoll::Pending => (),
}

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
Expand All @@ -498,12 +507,13 @@ impl<'a> TransactionStatus<'a> {

#[derive(Debug)]
pub enum TransactionPoll<T> {
Pending(String),
Complete(T),
Ready(T),
Pending,
Error(Error),
}

impl<'a> IntoFuture for TransactionStatus<'a> {
type Output = ExecutionFinalResult;
type Output = Result<ExecutionFinalResult>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand Down
3 changes: 1 addition & 2 deletions workspaces/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,14 @@ impl Client {
&self,
sender_id: &AccountId,
hash: CryptoHash,
) -> Result<FinalExecutionOutcomeView> {
) -> Result<FinalExecutionOutcomeView, JsonRpcError<RpcTransactionError>> {
self.query(methods::tx::RpcTransactionStatusRequest {
transaction_info: methods::tx::TransactionInfo::TransactionId {
account_id: sender_id.clone(),
hash,
},
})
.await
.map_err(|e| RpcErrorCode::BroadcastTxFailure.custom(e))
}

pub(crate) async fn wait_for_rpc(&self) -> Result<()> {
Expand Down
3 changes: 2 additions & 1 deletion workspaces/tests/parallel_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::VecDeque;

use serde_json::json;
use workspaces::operations::TransactionPoll;

const STATUS_MSG_CONTRACT: &[u8] = include_bytes!("../../examples/res/status_message.wasm");

Expand Down Expand Up @@ -63,7 +64,7 @@ async fn test_parallel_async() -> anyhow::Result<()> {
// Retry checking the statuses of all transactions until the queue is empty
// with all transactions completed.
while let Some(status) = statuses.pop_front() {
if let Err(_err) = status.status().await {
if matches!(status.status().await, TransactionPoll::Pending) {
statuses.push_back(status);
}
}
Expand Down

0 comments on commit da16342

Please sign in to comment.