diff --git a/applications/tari_app_grpc/proto/wallet.proto b/applications/tari_app_grpc/proto/wallet.proto index 7fd6d3606f..bc561f0e02 100644 --- a/applications/tari_app_grpc/proto/wallet.proto +++ b/applications/tari_app_grpc/proto/wallet.proto @@ -50,6 +50,8 @@ service Wallet { rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse); // List currently connected peers rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse); + // Cancel pending transaction + rpc CancelTransaction (CancelTransactionRequest) returns (CancelTransactionResponse); } message GetVersionRequest { } @@ -185,3 +187,11 @@ message ImportUtxosResponse { repeated uint64 tx_ids = 1; } +message CancelTransactionRequest { + uint64 tx_id = 1; +} + +message CancelTransactionResponse { + bool is_success = 1; + string failure_message = 2; +} \ No newline at end of file diff --git a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs index 34797d8d92..b7e53ae6a2 100644 --- a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -393,6 +393,33 @@ impl wallet_server::Wallet for WalletGrpcServer { Ok(Response::new(resp)) } + + async fn cancel_transaction( + &self, + request: Request, + ) -> Result, Status> { + let message = request.into_inner(); + debug!( + target: LOG_TARGET, + "Incoming gRPC request to Cancel Transaction (TxId: {})", message.tx_id, + ); + let mut transaction_service = self.get_transaction_service(); + + match transaction_service.cancel_transaction(message.tx_id).await { + Ok(_) => { + return Ok(Response::new(tari_rpc::CancelTransactionResponse { + is_success: true, + failure_message: "".to_string(), + })) + }, + Err(e) => { + return Ok(Response::new(tari_rpc::CancelTransactionResponse { + is_success: false, + failure_message: e.to_string(), + })) + }, + } + } } fn convert_wallet_transaction_into_transaction_info( diff --git a/base_layer/wallet/src/output_manager_service/handle.rs b/base_layer/wallet/src/output_manager_service/handle.rs index 8b4ae008f7..367ce3d799 100644 --- a/base_layer/wallet/src/output_manager_service/handle.rs +++ b/base_layer/wallet/src/output_manager_service/handle.rs @@ -76,6 +76,7 @@ pub enum OutputManagerRequest { ScanForRecoverableOutputs(Vec), ScanOutputs(Vec), AddKnownOneSidedPaymentScript(KnownOneSidedPaymentScript), + ReinstateCancelledInboundTx(TxId), } impl fmt::Display for OutputManagerRequest { @@ -115,6 +116,7 @@ impl fmt::Display for OutputManagerRequest { ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"), ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"), AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"), + ReinstateCancelledInboundTx(_) => write!(f, "ReinstateCancelledInboundTx"), } } } @@ -149,6 +151,7 @@ pub enum OutputManagerResponse { RewoundOutputs(Vec), ScanOutputs(Vec), AddKnownOneSidedPaymentScript, + ReinstatedCancelledInboundTx, } pub type OutputManagerEventSender = broadcast::Sender>; @@ -545,4 +548,15 @@ impl OutputManagerHandle { _ => Err(OutputManagerError::UnexpectedApiResponse), } } + + pub async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> { + match self + .handle + .call(OutputManagerRequest::ReinstateCancelledInboundTx(tx_id)) + .await?? + { + OutputManagerResponse::ReinstatedCancelledInboundTx => Ok(()), + _ => Err(OutputManagerError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 120f26b7e2..a914d99713 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -40,6 +40,7 @@ use crate::{ types::{HashDigest, ValidationRetryStrategy}, }; use blake2::Digest; +use chrono::Utc; use diesel::result::{DatabaseErrorKind, Error as DieselError}; use futures::{pin_mut, StreamExt}; use log::*; @@ -343,6 +344,10 @@ where TBackend: OutputManagerBackend + 'static .add_known_script(known_script) .await .map(|_| OutputManagerResponse::AddKnownOneSidedPaymentScript), + OutputManagerRequest::ReinstateCancelledInboundTx(tx_id) => self + .reinstate_cancelled_inbound_transaction(tx_id) + .await + .map(|_| OutputManagerResponse::ReinstatedCancelledInboundTx), } } @@ -896,6 +901,27 @@ where TBackend: OutputManagerBackend + 'static Ok(self.resources.db.cancel_pending_transaction_outputs(tx_id).await?) } + /// Restore the pending transaction encumberance and output for an inbound transaction that was previously + /// cancelled. + async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> { + self.resources.db.reinstate_inbound_output(tx_id).await?; + + self.resources + .db + .add_pending_transaction_outputs(PendingTransactionOutputs { + tx_id, + outputs_to_be_spent: Vec::new(), + outputs_to_be_received: Vec::new(), + timestamp: Utc::now().naive_utc(), + coinbase_block_height: None, + }) + .await?; + + self.confirm_encumberance(tx_id).await?; + + Ok(()) + } + /// Go through the pending transaction and if any have existed longer than the specified duration, cancel them async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> { Ok(self.resources.db.timeout_pending_transaction_outputs(period).await?) diff --git a/base_layer/wallet/src/output_manager_service/storage/database.rs b/base_layer/wallet/src/output_manager_service/storage/database.rs index d67a2a8219..054d010b70 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database.rs @@ -23,7 +23,7 @@ use crate::output_manager_service::{ error::OutputManagerStorageError, service::Balance, - storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript}, + storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus}, TxId, }; use aes_gcm::Aes256Gcm; @@ -135,6 +135,7 @@ pub enum DbKey { KeyManagerState, InvalidOutputs, KnownOneSidedPaymentScripts, + OutputsByTxIdAndStatus(TxId, OutputStatus), } #[derive(Debug)] @@ -149,6 +150,7 @@ pub enum DbValue { KeyManagerState(KeyManagerState), KnownOneSidedPaymentScripts(Vec), AnyOutput(Box), + AnyOutputs(Vec), } pub enum DbKeyValuePair { @@ -158,6 +160,7 @@ pub enum DbKeyValuePair { PendingTransactionOutputs(TxId, Box), KeyManagerState(KeyManagerState), KnownOneSidedPaymentScripts(KnownOneSidedPaymentScript), + UpdateOutputStatus(Commitment, OutputStatus), } pub enum WriteOperation { @@ -713,6 +716,48 @@ where T: OutputManagerBackend + 'static .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } + + /// Check if a single cancelled inbound output exists that matches this TxID, if it does then return its status to + /// EncumberedToBeReceived + pub async fn reinstate_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> { + let db_clone = self.db.clone(); + let outputs = tokio::task::spawn_blocking(move || { + match db_clone.fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) { + Ok(None) => Err(OutputManagerStorageError::ValueNotFound), + Ok(Some(DbValue::AnyOutputs(o))) => Ok(o), + Ok(Some(other)) => unexpected_result( + DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), + other, + ), + Err(e) => log_error(DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), e), + } + }) + .await + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) + .and_then(|inner_result| inner_result)?; + + if outputs.len() != 1 { + return Err(OutputManagerStorageError::UnexpectedResult( + "There should be only 1 output for a cancelled inbound transaction but more were found".to_string(), + )); + } + let db_clone2 = self.db.clone(); + + tokio::task::spawn_blocking(move || { + db_clone2.write(WriteOperation::Insert(DbKeyValuePair::UpdateOutputStatus( + outputs + .first() + .expect("Must be only one element in outputs") + .commitment + .clone(), + OutputStatus::EncumberedToBeReceived, + ))) + }) + .await + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } } fn unexpected_result(req: DbKey, res: DbValue) -> Result { @@ -737,6 +782,7 @@ impl Display for DbKey { DbKey::TimeLockedUnspentOutputs(_t) => f.write_str(&"Timelocked Outputs"), DbKey::KnownOneSidedPaymentScripts => f.write_str(&"Known claiming scripts"), DbKey::AnyOutputByCommitment(_) => f.write_str(&"AnyOutputByCommitment"), + DbKey::OutputsByTxIdAndStatus(_, _) => f.write_str(&"OutputsByTxIdAndStatus"), } } } @@ -754,6 +800,7 @@ impl Display for DbValue { DbValue::InvalidOutputs(_) => f.write_str("Invalid Outputs"), DbValue::KnownOneSidedPaymentScripts(_) => f.write_str(&"Known claiming scripts"), DbValue::AnyOutput(_) => f.write_str(&"Any Output"), + DbValue::AnyOutputs(_) => f.write_str(&"Any Outputs"), } } } diff --git a/base_layer/wallet/src/output_manager_service/storage/models.rs b/base_layer/wallet/src/output_manager_service/storage/models.rs index f276f1d2df..dd36eb6934 100644 --- a/base_layer/wallet/src/output_manager_service/storage/models.rs +++ b/base_layer/wallet/src/output_manager_service/storage/models.rs @@ -105,3 +105,14 @@ impl PartialEq for KnownOneSidedPaymentScript { self.script_hash == other.script_hash } } + +/// The status of a given output +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum OutputStatus { + Unspent, + Spent, + EncumberedToBeReceived, + EncumberedToBeSpent, + Invalid, + CancelledInbound, +} diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs index df5b2c89da..820edcb525 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs @@ -33,7 +33,7 @@ use crate::{ PendingTransactionOutputs, WriteOperation, }, - models::{DbUnblindedOutput, KnownOneSidedPaymentScript}, + models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus}, }, TxId, }, @@ -135,6 +135,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { None }, }, + DbKey::AnyOutputByCommitment(commitment) => { match OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn)) { Ok(mut o) => { @@ -173,6 +174,18 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { None }, }, + DbKey::OutputsByTxIdAndStatus(tx_id, status) => { + let mut outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &(*conn))?; + for o in outputs.iter_mut() { + self.decrypt_if_necessary(o)?; + } + Some(DbValue::AnyOutputs( + outputs + .iter() + .map(|o| DbUnblindedOutput::try_from(o.clone())) + .collect::, _>>()?, + )) + }, DbKey::UnspentOutputs => { let mut outputs = OutputSql::index_status(OutputStatus::Unspent, &(*conn))?; for o in outputs.iter_mut() { @@ -337,6 +350,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { self.encrypt_if_necessary(&mut script_sql)?; script_sql.commit(&(*conn))? }, + DbKeyValuePair::UpdateOutputStatus(commitment, status) => { + let output = OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn))?; + output.update( + UpdateOutput { + status: Some(status), + tx_id: None, + spending_key: None, + script_private_key: None, + metadata_signature_nonce: None, + metadata_signature_u_key: None, + }, + &(*conn), + )?; + }, }, WriteOperation::Remove(k) => match k { DbKey::SpentOutput(s) => match OutputSql::find_status(&s.to_vec(), OutputStatus::Spent, &(*conn)) { @@ -409,6 +436,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { DbKey::InvalidOutputs => return Err(OutputManagerStorageError::OperationNotSupported), DbKey::TimeLockedUnspentOutputs(_) => return Err(OutputManagerStorageError::OperationNotSupported), DbKey::KnownOneSidedPaymentScripts => return Err(OutputManagerStorageError::OperationNotSupported), + DbKey::OutputsByTxIdAndStatus(_, _) => return Err(OutputManagerStorageError::OperationNotSupported), }, } @@ -840,17 +868,6 @@ fn pending_transaction_outputs_from_sql_outputs( }) } -/// The status of a given output -#[derive(PartialEq)] -enum OutputStatus { - Unspent, - Spent, - EncumberedToBeReceived, - EncumberedToBeSpent, - Invalid, - CancelledInbound, -} - impl TryFrom for OutputStatus { type Error = OutputManagerStorageError; @@ -1011,6 +1028,17 @@ impl OutputSql { Ok(request.first::(conn)?) } + pub fn find_by_tx_id_and_status( + tx_id: TxId, + status: OutputStatus, + conn: &SqliteConnection, + ) -> Result, OutputManagerStorageError> { + Ok(outputs::table + .filter(outputs::tx_id.eq(Some(tx_id as i64))) + .filter(outputs::status.eq(status as i32)) + .load(conn)?) + } + /// Find outputs via tx_id that are encumbered. Any outputs that are encumbered cannot be marked as spent. pub fn find_by_tx_id_and_encumbered( tx_id: TxId, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 2068d96eac..17cc84c7df 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -377,7 +377,11 @@ where "Handling Transaction Finalized Message, Trace: {}", msg.dht_header.message_tag.as_value() ); - let result = self.accept_finalized_transaction(origin_public_key, inner_msg, ).await; + let result = self.accept_finalized_transaction( + origin_public_key, + inner_msg, + &mut receive_transaction_protocol_handles, + ).await; match result { Err(TransactionServiceError::TransactionDoesNotExistError) => { @@ -1300,6 +1304,7 @@ where &mut self, source_pubkey: CommsPublicKey, finalized_transaction: proto::TransactionFinalizedMessage, + join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { let tx_id = finalized_transaction.tx_id; let transaction: Transaction = finalized_transaction @@ -1317,7 +1322,39 @@ where })?; let sender = match self.finalized_transaction_senders.get_mut(&tx_id) { - None => return Err(TransactionServiceError::TransactionDoesNotExistError), + None => { + // First check if perhaps we know about this inbound transaction but it was cancelled + match self.db.get_cancelled_pending_inbound_transaction(tx_id).await { + Ok(t) => { + if t.source_public_key != source_pubkey { + debug!( + target: LOG_TARGET, + "Received Finalized Transaction for a cancelled pending Inbound Transaction (TxId: \ + {}) but Source Public Key did not match", + tx_id + ); + return Err(TransactionServiceError::TransactionDoesNotExistError); + } + info!( + target: LOG_TARGET, + "Received Finalized Transaction for a cancelled pending Inbound Transaction (TxId: {}). \ + Restarting protocol", + tx_id + ); + self.db.uncancel_pending_transaction(tx_id).await?; + self.output_manager_service + .reinstate_cancelled_inbound_transaction(tx_id) + .await?; + + self.restart_receive_transaction_protocol(tx_id, source_pubkey.clone(), join_handles); + match self.finalized_transaction_senders.get_mut(&tx_id) { + None => return Err(TransactionServiceError::TransactionDoesNotExistError), + Some(s) => s, + } + }, + Err(_) => return Err(TransactionServiceError::TransactionDoesNotExistError), + } + }, Some(s) => s, }; @@ -1401,34 +1438,43 @@ where ) -> Result<(), TransactionServiceError> { let inbound_txs = self.db.get_pending_inbound_transactions().await?; for (tx_id, tx) in inbound_txs { - if !self.pending_transaction_reply_senders.contains_key(&tx_id) { - debug!( - target: LOG_TARGET, - "Restarting listening for Transaction Finalize for Pending Inbound Transaction TxId: {}", tx_id - ); - let (tx_finalized_sender, tx_finalized_receiver) = mpsc::channel(100); - let (cancellation_sender, cancellation_receiver) = oneshot::channel(); - self.finalized_transaction_senders.insert(tx_id, tx_finalized_sender); - self.receiver_transaction_cancellation_senders - .insert(tx_id, cancellation_sender); - let protocol = TransactionReceiveProtocol::new( - tx_id, - tx.source_public_key, - TransactionSenderMessage::None, - TransactionReceiveProtocolStage::WaitForFinalize, - self.resources.clone(), - tx_finalized_receiver, - cancellation_receiver, - ); - - let join_handle = tokio::spawn(protocol.execute()); - join_handles.push(join_handle); - } + self.restart_receive_transaction_protocol(tx_id, tx.source_public_key.clone(), join_handles); } Ok(()) } + fn restart_receive_transaction_protocol( + &mut self, + tx_id: TxId, + source_public_key: CommsPublicKey, + join_handles: &mut FuturesUnordered>>, + ) { + if !self.pending_transaction_reply_senders.contains_key(&tx_id) { + debug!( + target: LOG_TARGET, + "Restarting listening for Transaction Finalize for Pending Inbound Transaction TxId: {}", tx_id + ); + let (tx_finalized_sender, tx_finalized_receiver) = mpsc::channel(100); + let (cancellation_sender, cancellation_receiver) = oneshot::channel(); + self.finalized_transaction_senders.insert(tx_id, tx_finalized_sender); + self.receiver_transaction_cancellation_senders + .insert(tx_id, cancellation_sender); + let protocol = TransactionReceiveProtocol::new( + tx_id, + source_public_key, + TransactionSenderMessage::None, + TransactionReceiveProtocolStage::WaitForFinalize, + self.resources.clone(), + tx_finalized_receiver, + cancellation_receiver, + ); + + let join_handle = tokio::spawn(protocol.execute()); + join_handles.push(join_handle); + } + } + /// Add a base node public key to the list that will be used to broadcast transactions and monitor the base chain /// for the presence of spendable outputs. If this is the first time the base node public key is set do the initial /// mempool broadcast diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index b6844a7c44..df03327e19 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -90,8 +90,12 @@ pub trait TransactionBackend: Send + Sync + Clone { fn set_completed_transaction_validity(&self, tx_id: TxId, valid: bool) -> Result<(), TransactionStorageError>; /// Cancel Completed transaction, this will update the transaction status fn cancel_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError>; - /// Cancel Completed transaction, this will update the transaction status - fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError>; + /// Set cancellation on Pending transaction, this will update the transaction status + fn set_pending_transaction_cancellation_status( + &self, + tx_id: TxId, + cancelled: bool, + ) -> Result<(), TransactionStorageError>; /// Search all pending transaction for the provided tx_id and if it exists return the public key of the counterparty fn get_pending_transaction_counterparty_pub_key_by_tx_id( &self, @@ -563,7 +567,15 @@ where T: TransactionBackend + 'static pub async fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.cancel_pending_transaction(tx_id)) + tokio::task::spawn_blocking(move || db_clone.set_pending_transaction_cancellation_status(tx_id, true)) + .await + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + Ok(()) + } + + pub async fn uncancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + let db_clone = self.db.clone(); + tokio::task::spawn_blocking(move || db_clone.set_pending_transaction_cancellation_status(tx_id, false)) .await .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) diff --git a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs index 841eb96510..f4e77e342d 100644 --- a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs @@ -576,16 +576,20 @@ impl TransactionBackend for TransactionServiceSqliteDatabase { Ok(()) } - fn cancel_pending_transaction(&self, tx_id: u64) -> Result<(), TransactionStorageError> { + fn set_pending_transaction_cancellation_status( + &self, + tx_id: u64, + cancelled: bool, + ) -> Result<(), TransactionStorageError> { let conn = self.database_connection.acquire_lock(); - match InboundTransactionSql::find_by_cancelled(tx_id, false, &(*conn)) { + match InboundTransactionSql::find(tx_id, &(*conn)) { Ok(v) => { - v.cancel(&(*conn))?; + v.set_cancelled(cancelled, &(*conn))?; }, Err(_) => { - match OutboundTransactionSql::find_by_cancelled(tx_id, false, &(*conn)) { + match OutboundTransactionSql::find(tx_id, &(*conn)) { Ok(v) => { - v.cancel(&(*conn))?; + v.set_cancelled(cancelled, &(*conn))?; }, Err(TransactionStorageError::DieselError(DieselError::NotFound)) => { return Err(TransactionStorageError::ValuesNotFound); @@ -1019,10 +1023,10 @@ impl InboundTransactionSql { Ok(()) } - pub fn cancel(&self, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { + pub fn set_cancelled(&self, cancelled: bool, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { self.update( UpdateInboundTransactionSql { - cancelled: Some(1i32), + cancelled: Some(cancelled as i32), direct_send_success: None, receiver_protocol: None, send_count: None, @@ -1202,10 +1206,10 @@ impl OutboundTransactionSql { Ok(()) } - pub fn cancel(&self, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { + pub fn set_cancelled(&self, cancelled: bool, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { self.update( UpdateOutboundTransactionSql { - cancelled: Some(1i32), + cancelled: Some(cancelled as i32), direct_send_success: None, sender_protocol: None, send_count: None, @@ -1986,23 +1990,34 @@ mod test { assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_err()); InboundTransactionSql::try_from(inbound_tx1.clone()) .unwrap() - .cancel(&conn) + .set_cancelled(true, &conn) .unwrap(); assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, false, &conn).is_err()); assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_ok()); - + InboundTransactionSql::try_from(inbound_tx1.clone()) + .unwrap() + .set_cancelled(false, &conn) + .unwrap(); + assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_err()); + assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, false, &conn).is_ok()); OutboundTransactionSql::try_from(outbound_tx1.clone()) .unwrap() .commit(&conn) .unwrap(); assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, true, &conn).is_err()); - OutboundTransactionSql::try_from(outbound_tx1) + OutboundTransactionSql::try_from(outbound_tx1.clone()) .unwrap() - .cancel(&conn) + .set_cancelled(true, &conn) .unwrap(); - assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, false, &conn).is_err()); - assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_ok()); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, false, &conn).is_err()); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, true, &conn).is_ok()); + OutboundTransactionSql::try_from(outbound_tx1.clone()) + .unwrap() + .set_cancelled(false, &conn) + .unwrap(); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, true, &conn).is_err()); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, false, &conn).is_ok()); CompletedTransactionSql::try_from(completed_tx1.clone()) .unwrap() diff --git a/base_layer/wallet/tests/output_manager_service/service.rs b/base_layer/wallet/tests/output_manager_service/service.rs index c3110e59f2..98d061e23b 100644 --- a/base_layer/wallet/tests/output_manager_service/service.rs +++ b/base_layer/wallet/tests/output_manager_service/service.rs @@ -84,6 +84,7 @@ use tari_wallet::{ types::ValidationRetryStrategy, }; +use tari_wallet::output_manager_service::storage::models::OutputStatus; use tokio::{ runtime::Runtime, sync::{broadcast, broadcast::channel}, @@ -855,6 +856,59 @@ fn cancel_transaction() { assert_eq!(runtime.block_on(oms.get_unspent_outputs()).unwrap().len(), num_outputs); } +#[test] +fn cancel_transaction_and_reinstate_inbound_tx() { + let mut runtime = Runtime::new().unwrap(); + + let (connection, _tempdir) = get_temp_sqlite_database_connection(); + let backend = OutputManagerSqliteDatabase::new(connection, None); + + let (mut oms, _shutdown, _, _, _, _, _) = setup_output_manager_service(&mut runtime, backend.clone(), true); + + let value = MicroTari::from(5000); + let (tx_id, sender_message) = generate_sender_transaction_message(value); + let _rtp = runtime.block_on(oms.get_recipient_transaction(sender_message)).unwrap(); + assert_eq!(runtime.block_on(oms.get_unspent_outputs()).unwrap().len(), 0); + + let pending_txs = runtime.block_on(oms.get_pending_transactions()).unwrap(); + + assert_eq!(pending_txs.len(), 1); + + let output = pending_txs + .get(&tx_id) + .unwrap() + .outputs_to_be_received + .first() + .unwrap() + .clone(); + + runtime.block_on(oms.cancel_transaction(tx_id)).unwrap(); + + let cancelled_output = backend + .fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) + .unwrap() + .unwrap(); + + if let DbValue::AnyOutputs(o) = cancelled_output { + let o = o.first().expect("Should be one output in here"); + assert_eq!(o.commitment, output.commitment); + } else { + assert!(false, "Should have found cancelled output"); + } + + assert_eq!(runtime.block_on(oms.get_pending_transactions()).unwrap().len(), 0); + + runtime + .block_on(oms.reinstate_cancelled_inbound_transaction(tx_id)) + .unwrap(); + + assert_eq!(runtime.block_on(oms.get_pending_transactions()).unwrap().len(), 1); + + let balance = runtime.block_on(oms.get_balance()).unwrap(); + + assert_eq!(balance.pending_incoming_balance, value); +} + #[test] fn timeout_transaction() { let factories = CryptoFactories::default(); diff --git a/clients/wallet_grpc_client/index.js b/clients/wallet_grpc_client/index.js index 15347770d2..d638ba1f5c 100644 --- a/clients/wallet_grpc_client/index.js +++ b/clients/wallet_grpc_client/index.js @@ -39,6 +39,7 @@ function Client(address) { "importUtxos", "listConnectedPeers", "getNetworkStatus", + "cancelTransaction", ]; this.waitForReady = (...args) => { diff --git a/integration_tests/features/WalletTransactions.feature b/integration_tests/features/WalletTransactions.feature index 4f40a3e317..434de8f74c 100644 --- a/integration_tests/features/WalletTransactions.feature +++ b/integration_tests/features/WalletTransactions.feature @@ -161,4 +161,42 @@ Feature: Wallet Transactions Then I wait for wallet WALLET_B to have at least 500000 uT Then I check if wallet WALLET_B has 5 transactions Then I restart wallet WALLET_B - Then I check if wallet WALLET_B has 5 transactions \ No newline at end of file + Then I check if wallet WALLET_B has 5 transactions + + Scenario: Wallet SAF negotiation and cancellation with offline peers + Given I have a seed node NODE + And I have 1 base nodes connected to all seed nodes + And I have wallet WALLET_A connected to all seed nodes + And I have mining node MINER connected to base node NODE and wallet WALLET_A + And mining node MINER mines 5 blocks + Then all nodes are at height 5 + Then I wait for wallet WALLET_A to have at least 10000000000 uT + And I have non-default wallet WALLET_SENDER connected to all seed nodes using StoreAndForwardOnly + And I send 100000000 uT from wallet WALLET_A to wallet WALLET_SENDER at fee 100 + When wallet WALLET_SENDER detects all transactions are at least Broadcast + And mining node MINER mines 5 blocks + Then all nodes are at height 10 + Then I wait for wallet WALLET_SENDER to have at least 100000000 uT + And I have wallet WALLET_RECV connected to all seed nodes + And I stop wallet WALLET_RECV + And I send 1000000 uT from wallet WALLET_SENDER to wallet WALLET_RECV at fee 100 + When wallet WALLET_SENDER detects last transaction is Pending + Then I stop wallet WALLET_SENDER + And I start wallet WALLET_RECV + And I wait for 5 seconds + When wallet WALLET_RECV detects all transactions are at least Pending + Then I cancel last transaction in wallet WALLET_RECV + Then I stop wallet WALLET_RECV + And I start wallet WALLET_SENDER + # This is a weirdness that I haven't been able to figure out. When you start WALLET_SENDER on the line above it + # requests SAF messages from the base nodes the base nodes get the request and attempt to send the stored messages + # but the connection fails. It requires a second reconnection and request for the SAF messages to be delivered. + And I wait for 5 seconds + Then I restart wallet WALLET_SENDER + When wallet WALLET_SENDER detects all transactions are at least Broadcast + And mining node MINER mines 5 blocks + Then all nodes are at height 15 + When wallet WALLET_SENDER detects all transactions as Mined_Confirmed + And I start wallet WALLET_RECV + Then I restart wallet WALLET_RECV + Then I wait for wallet WALLET_RECV to have at least 1000000 uT \ No newline at end of file diff --git a/integration_tests/features/support/steps.js b/integration_tests/features/support/steps.js index 91499f7881..9212921883 100644 --- a/integration_tests/features/support/steps.js +++ b/integration_tests/features/support/steps.js @@ -344,6 +344,9 @@ Given( wallet.setPeerSeeds([this.seedAddresses()]); await wallet.startNew(); this.addWallet(name, wallet); + let walletClient = await this.getWallet(name).connectClient(); + let walletInfo = await walletClient.identify(); + this.addWalletPubkey(name, walletInfo.public_key); } ); @@ -419,6 +422,11 @@ When(/I stop wallet (.*)/, async function (walletName) { await wallet.stop(); }); +When(/I start wallet (.*)/, async function (walletName) { + let wallet = this.getWallet(walletName); + await wallet.start(); +}); + When(/I restart wallet (.*)/, async function (walletName) { let wallet = this.getWallet(walletName); await wallet.stop(); @@ -700,7 +708,7 @@ Then("Proxy response for block header by hash is valid", function () { assert(lastResult.result.status, "OK"); }); -When(/I start (.*)/, { timeout: 20 * 1000 }, async function (name) { +When(/I start base node (.*)/, { timeout: 20 * 1000 }, async function (name) { await this.startNode(name); }); @@ -1388,7 +1396,8 @@ Then( async function send_tari( sourceWallet, - destWallet, + destWalletName, + destWalletPubkey, tariAmount, feePerGram, oneSided = false, @@ -1396,21 +1405,18 @@ async function send_tari( printMessage = true ) { const sourceWalletClient = await sourceWallet.connectClient(); - const destClient = await destWallet.connectClient(); - const destInfo = await destClient.identify(); - if (message === "") { - message = - sourceWallet.name + + console.log( + sourceWallet.name + " sending " + tariAmount + "uT one-sided(" + oneSided + ") to " + - destWallet.name + + destWalletName + " `" + - destInfo.public_key + - "`"; - } + destWalletPubkey + + "`" + ); if (printMessage) { console.log(message); } @@ -1426,7 +1432,7 @@ async function send_tari( lastResult = await sourceWalletClient.transfer({ recipients: [ { - address: destInfo.public_key, + address: destWalletPubkey, amount: tariAmount, fee_per_gram: feePerGram, message: message, @@ -1437,7 +1443,7 @@ async function send_tari( lastResult = await sourceWalletClient.transfer({ recipients: [ { - address: destInfo.public_key, + address: destWalletPubkey, amount: tariAmount, fee_per_gram: feePerGram, message: message, @@ -1486,12 +1492,12 @@ When( const sourceClient = await sourceWallet.connectClient(); const sourceInfo = await sourceClient.identify(); - const destWallet = this.getWallet(dest); - const destClient = await destWallet.connectClient(); - const destInfo = await destClient.identify(); + const destPublicKey = this.getWalletPubkey(dest); + this.lastResult = await send_tari( sourceWallet, - destWallet, + dest, + destPublicKey, tariAmount, feePerGram ); @@ -1501,7 +1507,7 @@ When( this.lastResult.results[0].transaction_id ); this.addTransaction( - destInfo.public_key, + destPublicKey, this.lastResult.results[0].transaction_id ); console.log( @@ -1526,7 +1532,8 @@ When( for (let i = 0; i < number; i++) { this.lastResult = await send_tari( this.getWallet(source), - this.getWallet(dest), + destInfo.name, + destInfo.public_key, tariAmount, fee ); @@ -1560,7 +1567,8 @@ When( const destInfo = await destClient.identify(); this.lastResult = await send_tari( this.getWallet(source), - this.getWallet(wallet), + destInfo.name, + destInfo.public_key, tariAmount, fee ); @@ -1683,7 +1691,8 @@ When( const sourceInfo = await sourceClient.identify(); this.lastResult = await send_tari( this.getWallet(source), - this.getWallet(source), + sourceInfo.name, + sourceInfo.public_key, tariAmount, feePerGram ); @@ -1764,6 +1773,31 @@ When( } ); +When( + /I cancel last transaction in wallet (.*)/, + { timeout: 25 * 5 * 1000 }, + async function (walletName) { + const wallet = this.getWallet(walletName); + const walletClient = await wallet.connectClient(); + + let lastTxId = this.lastResult.results[0].transaction_id; + console.log( + "Attempting to cancel transaction ", + lastTxId, + "from wallet", + walletName + ); + + let result = await walletClient.cancelTransaction(lastTxId); + console.log( + "Cancellation successful? ", + result.success, + result.failure_message + ); + assert(result.success, true); + } +); + When(/I wait (.*) seconds/, { timeout: 600 * 1000 }, async function (int) { console.log("Waiting for", int, "seconds"); await sleep(int * 1000); @@ -1925,6 +1959,36 @@ Then( } ); +Then( + /wallet (.*) detects last transaction is Pending/, + { timeout: 3800 * 1000 }, + async function (walletName) { + const wallet = this.getWallet(walletName); + const walletClient = await wallet.connectClient(); + + let lastTxId = this.lastResult.results[0].transaction_id; + console.log( + "Waiting for Transaction ", + lastTxId, + "to be pending in wallet", + walletName + ); + + await waitFor( + async () => walletClient.isTransactionPending(lastTxId), + true, + 3700 * 1000, + 5 * 1000, + 5 + ); + const transactionPending = await walletClient.isTransactionPending( + lastTxId + ); + + expect(transactionPending).to.equal(true); + } +); + Then( /wallet (.*) detects all transactions are at least Completed/, { timeout: 1200 * 1000 }, diff --git a/integration_tests/features/support/world.js b/integration_tests/features/support/world.js index 720599eb7d..7df9274077 100644 --- a/integration_tests/features/support/world.js +++ b/integration_tests/features/support/world.js @@ -103,6 +103,10 @@ class CustomWorld { this.wallets[name] = process; } + addWalletPubkey(name, pubkey) { + this.walletPubkeys[name] = pubkey; + } + addOutput(name, output) { this.outputs[name] = output; } diff --git a/integration_tests/helpers/walletClient.js b/integration_tests/helpers/walletClient.js index e172ba6b3e..02bb9152b3 100644 --- a/integration_tests/helpers/walletClient.js +++ b/integration_tests/helpers/walletClient.js @@ -222,6 +222,25 @@ class WalletClient { } } + async isTransactionPending(tx_id) { + try { + const txnDetails = await this.getTransactionInfo({ + transaction_ids: [tx_id.toString()], + }); + if ( + transactionStatus().indexOf(txnDetails.transactions[0].status) == 2 && + txnDetails.transactions[0].valid + ) { + return true; + } else { + return false; + } + } catch (err) { + // Any error here must be treated as if the required status was not achieved + return false; + } + } + async isTransactionAtLeastCompleted(tx_id) { try { const txnDetails = await this.getTransactionInfo({ @@ -352,6 +371,22 @@ class WalletClient { num_node_connections: +resp.num_node_connections, }; } + async cancelTransaction(tx_id) { + try { + const result = await this.client.cancelTransaction({ + tx_id: tx_id, + }); + return { + success: result.is_success, + failure_message: result.failure_message, + }; + } catch (err) { + return { + success: false, + failure_message: err, + }; + } + } } module.exports = WalletClient;