Skip to content

Commit

Permalink
Handle receiver cancelling an inbound transaction that is later received
Browse files Browse the repository at this point in the history
# Based on tari-project#3164

This PR addresses the following scenario spotted by @stanimal:

 - NodeA sends to nodeB(offline) 
- NodeA goes offline 
- NodeB receives tx, and cancels it (weird I know) 
- NodeA comes online and broadcasts the transaction 
- NodeB is not aware of the transaction, transaction complete for NodeA

This is handled by adding logic that if a FinalizedTransaction is received with no active Receive Protocols that the database is checked if there is a matching cancelled inbound transaction from the same pubkey. If there is the receiver might as well restart that protocol and accept the finalized transaction.

A cucumber test is provided to test this case.

This required adding in functionality to the Transaction and Output Manager service to reinstate a cancelled inbound transaction, unit tests provided for that.
  • Loading branch information
philipr-za committed Aug 10, 2021
1 parent 03c5146 commit 35bab9f
Show file tree
Hide file tree
Showing 16 changed files with 510 additions and 78 deletions.
10 changes: 10 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ service Wallet {
rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse);
// List currently connected peers
rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse);
// Cancel pending transaction
rpc CancelTransaction (CancelTransactionRequest) returns (CancelTransactionResponse);
}

message GetVersionRequest { }
Expand Down Expand Up @@ -185,3 +187,11 @@ message ImportUtxosResponse {
repeated uint64 tx_ids = 1;
}

message CancelTransactionRequest {
uint64 tx_id = 1;
}

message CancelTransactionResponse {
bool is_success = 1;
string failure_message = 2;
}
27 changes: 27 additions & 0 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,33 @@ impl wallet_server::Wallet for WalletGrpcServer {

Ok(Response::new(resp))
}

async fn cancel_transaction(
&self,
request: Request<tari_rpc::CancelTransactionRequest>,
) -> Result<Response<tari_rpc::CancelTransactionResponse>, Status> {
let message = request.into_inner();
debug!(
target: LOG_TARGET,
"Incoming gRPC request to Cancel Transaction (TxId: {})", message.tx_id,
);
let mut transaction_service = self.get_transaction_service();

match transaction_service.cancel_transaction(message.tx_id).await {
Ok(_) => {
return Ok(Response::new(tari_rpc::CancelTransactionResponse {
is_success: true,
failure_message: "".to_string(),
}))
},
Err(e) => {
return Ok(Response::new(tari_rpc::CancelTransactionResponse {
is_success: false,
failure_message: e.to_string(),
}))
},
}
}
}

fn convert_wallet_transaction_into_transaction_info(
Expand Down
14 changes: 14 additions & 0 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum OutputManagerRequest {
ScanForRecoverableOutputs(Vec<TransactionOutput>),
ScanOutputs(Vec<TransactionOutput>),
AddKnownOneSidedPaymentScript(KnownOneSidedPaymentScript),
ReinstateCancelledInboundTx(TxId),
}

impl fmt::Display for OutputManagerRequest {
Expand Down Expand Up @@ -115,6 +116,7 @@ impl fmt::Display for OutputManagerRequest {
ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"),
ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"),
AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"),
ReinstateCancelledInboundTx(_) => write!(f, "ReinstateCancelledInboundTx"),
}
}
}
Expand Down Expand Up @@ -149,6 +151,7 @@ pub enum OutputManagerResponse {
RewoundOutputs(Vec<UnblindedOutput>),
ScanOutputs(Vec<UnblindedOutput>),
AddKnownOneSidedPaymentScript,
ReinstatedCancelledInboundTx,
}

pub type OutputManagerEventSender = broadcast::Sender<Arc<OutputManagerEvent>>;
Expand Down Expand Up @@ -545,4 +548,15 @@ impl OutputManagerHandle {
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}

pub async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> {
match self
.handle
.call(OutputManagerRequest::ReinstateCancelledInboundTx(tx_id))
.await??
{
OutputManagerResponse::ReinstatedCancelledInboundTx => Ok(()),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
}
26 changes: 26 additions & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
types::{HashDigest, ValidationRetryStrategy},
};
use blake2::Digest;
use chrono::Utc;
use diesel::result::{DatabaseErrorKind, Error as DieselError};
use futures::{pin_mut, StreamExt};
use log::*;
Expand Down Expand Up @@ -343,6 +344,10 @@ where TBackend: OutputManagerBackend + 'static
.add_known_script(known_script)
.await
.map(|_| OutputManagerResponse::AddKnownOneSidedPaymentScript),
OutputManagerRequest::ReinstateCancelledInboundTx(tx_id) => self
.reinstate_cancelled_inbound_transaction(tx_id)
.await
.map(|_| OutputManagerResponse::ReinstatedCancelledInboundTx),
}
}

Expand Down Expand Up @@ -896,6 +901,27 @@ where TBackend: OutputManagerBackend + 'static
Ok(self.resources.db.cancel_pending_transaction_outputs(tx_id).await?)
}

/// Restore the pending transaction encumberance and output for an inbound transaction that was previously
/// cancelled.
async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> {
self.resources.db.reinstate_inbound_output(tx_id).await?;

self.resources
.db
.add_pending_transaction_outputs(PendingTransactionOutputs {
tx_id,
outputs_to_be_spent: Vec::new(),
outputs_to_be_received: Vec::new(),
timestamp: Utc::now().naive_utc(),
coinbase_block_height: None,
})
.await?;

self.confirm_encumberance(tx_id).await?;

Ok(())
}

/// Go through the pending transaction and if any have existed longer than the specified duration, cancel them
async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
Ok(self.resources.db.timeout_pending_transaction_outputs(period).await?)
Expand Down
49 changes: 48 additions & 1 deletion base_layer/wallet/src/output_manager_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::output_manager_service::{
error::OutputManagerStorageError,
service::Balance,
storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript},
storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
TxId,
};
use aes_gcm::Aes256Gcm;
Expand Down Expand Up @@ -135,6 +135,7 @@ pub enum DbKey {
KeyManagerState,
InvalidOutputs,
KnownOneSidedPaymentScripts,
OutputsByTxIdAndStatus(TxId, OutputStatus),
}

#[derive(Debug)]
Expand All @@ -149,6 +150,7 @@ pub enum DbValue {
KeyManagerState(KeyManagerState),
KnownOneSidedPaymentScripts(Vec<KnownOneSidedPaymentScript>),
AnyOutput(Box<DbUnblindedOutput>),
AnyOutputs(Vec<DbUnblindedOutput>),
}

pub enum DbKeyValuePair {
Expand All @@ -158,6 +160,7 @@ pub enum DbKeyValuePair {
PendingTransactionOutputs(TxId, Box<PendingTransactionOutputs>),
KeyManagerState(KeyManagerState),
KnownOneSidedPaymentScripts(KnownOneSidedPaymentScript),
UpdateOutputStatus(Commitment, OutputStatus),
}

pub enum WriteOperation {
Expand Down Expand Up @@ -713,6 +716,48 @@ where T: OutputManagerBackend + 'static
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

/// Check if a single cancelled inbound output exists that matches this TxID, if it does then return its status to
/// EncumberedToBeReceived
pub async fn reinstate_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
let db_clone = self.db.clone();
let outputs = tokio::task::spawn_blocking(move || {
match db_clone.fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) {
Ok(None) => Err(OutputManagerStorageError::ValueNotFound),
Ok(Some(DbValue::AnyOutputs(o))) => Ok(o),
Ok(Some(other)) => unexpected_result(
DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound),
other,
),
Err(e) => log_error(DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), e),
}
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?;

if outputs.len() != 1 {
return Err(OutputManagerStorageError::UnexpectedResult(
"There should be only 1 output for a cancelled inbound transaction but more were found".to_string(),
));
}
let db_clone2 = self.db.clone();

tokio::task::spawn_blocking(move || {
db_clone2.write(WriteOperation::Insert(DbKeyValuePair::UpdateOutputStatus(
outputs
.first()
.expect("Must be only one element in outputs")
.commitment
.clone(),
OutputStatus::EncumberedToBeReceived,
)))
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

Ok(())
}
}

fn unexpected_result<T>(req: DbKey, res: DbValue) -> Result<T, OutputManagerStorageError> {
Expand All @@ -737,6 +782,7 @@ impl Display for DbKey {
DbKey::TimeLockedUnspentOutputs(_t) => f.write_str(&"Timelocked Outputs"),
DbKey::KnownOneSidedPaymentScripts => f.write_str(&"Known claiming scripts"),
DbKey::AnyOutputByCommitment(_) => f.write_str(&"AnyOutputByCommitment"),
DbKey::OutputsByTxIdAndStatus(_, _) => f.write_str(&"OutputsByTxIdAndStatus"),
}
}
}
Expand All @@ -754,6 +800,7 @@ impl Display for DbValue {
DbValue::InvalidOutputs(_) => f.write_str("Invalid Outputs"),
DbValue::KnownOneSidedPaymentScripts(_) => f.write_str(&"Known claiming scripts"),
DbValue::AnyOutput(_) => f.write_str(&"Any Output"),
DbValue::AnyOutputs(_) => f.write_str(&"Any Outputs"),
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions base_layer/wallet/src/output_manager_service/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,14 @@ impl PartialEq for KnownOneSidedPaymentScript {
self.script_hash == other.script_hash
}
}

/// The status of a given output
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum OutputStatus {
Unspent,
Spent,
EncumberedToBeReceived,
EncumberedToBeSpent,
Invalid,
CancelledInbound,
}
52 changes: 40 additions & 12 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
PendingTransactionOutputs,
WriteOperation,
},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
},
TxId,
},
Expand Down Expand Up @@ -135,6 +135,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
None
},
},

DbKey::AnyOutputByCommitment(commitment) => {
match OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn)) {
Ok(mut o) => {
Expand Down Expand Up @@ -173,6 +174,18 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
None
},
},
DbKey::OutputsByTxIdAndStatus(tx_id, status) => {
let mut outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &(*conn))?;
for o in outputs.iter_mut() {
self.decrypt_if_necessary(o)?;
}
Some(DbValue::AnyOutputs(
outputs
.iter()
.map(|o| DbUnblindedOutput::try_from(o.clone()))
.collect::<Result<Vec<_>, _>>()?,
))
},
DbKey::UnspentOutputs => {
let mut outputs = OutputSql::index_status(OutputStatus::Unspent, &(*conn))?;
for o in outputs.iter_mut() {
Expand Down Expand Up @@ -337,6 +350,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
self.encrypt_if_necessary(&mut script_sql)?;
script_sql.commit(&(*conn))?
},
DbKeyValuePair::UpdateOutputStatus(commitment, status) => {
let output = OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn))?;
output.update(
UpdateOutput {
status: Some(status),
tx_id: None,
spending_key: None,
script_private_key: None,
metadata_signature_nonce: None,
metadata_signature_u_key: None,
},
&(*conn),
)?;
},
},
WriteOperation::Remove(k) => match k {
DbKey::SpentOutput(s) => match OutputSql::find_status(&s.to_vec(), OutputStatus::Spent, &(*conn)) {
Expand Down Expand Up @@ -409,6 +436,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
DbKey::InvalidOutputs => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::TimeLockedUnspentOutputs(_) => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::KnownOneSidedPaymentScripts => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::OutputsByTxIdAndStatus(_, _) => return Err(OutputManagerStorageError::OperationNotSupported),
},
}

Expand Down Expand Up @@ -840,17 +868,6 @@ fn pending_transaction_outputs_from_sql_outputs(
})
}

/// The status of a given output
#[derive(PartialEq)]
enum OutputStatus {
Unspent,
Spent,
EncumberedToBeReceived,
EncumberedToBeSpent,
Invalid,
CancelledInbound,
}

impl TryFrom<i32> for OutputStatus {
type Error = OutputManagerStorageError;

Expand Down Expand Up @@ -1011,6 +1028,17 @@ impl OutputSql {
Ok(request.first::<OutputSql>(conn)?)
}

pub fn find_by_tx_id_and_status(
tx_id: TxId,
status: OutputStatus,
conn: &SqliteConnection,
) -> Result<Vec<OutputSql>, OutputManagerStorageError> {
Ok(outputs::table
.filter(outputs::tx_id.eq(Some(tx_id as i64)))
.filter(outputs::status.eq(status as i32))
.load(conn)?)
}

/// Find outputs via tx_id that are encumbered. Any outputs that are encumbered cannot be marked as spent.
pub fn find_by_tx_id_and_encumbered(
tx_id: TxId,
Expand Down
Loading

0 comments on commit 35bab9f

Please sign in to comment.