diff --git a/applications/tari_app_grpc/proto/wallet.proto b/applications/tari_app_grpc/proto/wallet.proto index 7fd6d3606f..bc561f0e02 100644 --- a/applications/tari_app_grpc/proto/wallet.proto +++ b/applications/tari_app_grpc/proto/wallet.proto @@ -50,6 +50,8 @@ service Wallet { rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse); // List currently connected peers rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse); + // Cancel pending transaction + rpc CancelTransaction (CancelTransactionRequest) returns (CancelTransactionResponse); } message GetVersionRequest { } @@ -185,3 +187,11 @@ message ImportUtxosResponse { repeated uint64 tx_ids = 1; } +message CancelTransactionRequest { + uint64 tx_id = 1; +} + +message CancelTransactionResponse { + bool is_success = 1; + string failure_message = 2; +} \ No newline at end of file diff --git a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs index 34797d8d92..b7e53ae6a2 100644 --- a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -393,6 +393,33 @@ impl wallet_server::Wallet for WalletGrpcServer { Ok(Response::new(resp)) } + + async fn cancel_transaction( + &self, + request: Request, + ) -> Result, Status> { + let message = request.into_inner(); + debug!( + target: LOG_TARGET, + "Incoming gRPC request to Cancel Transaction (TxId: {})", message.tx_id, + ); + let mut transaction_service = self.get_transaction_service(); + + match transaction_service.cancel_transaction(message.tx_id).await { + Ok(_) => { + return Ok(Response::new(tari_rpc::CancelTransactionResponse { + is_success: true, + failure_message: "".to_string(), + })) + }, + Err(e) => { + return Ok(Response::new(tari_rpc::CancelTransactionResponse { + is_success: false, + failure_message: e.to_string(), + })) + }, + } + } } fn convert_wallet_transaction_into_transaction_info( diff --git a/base_layer/wallet/src/output_manager_service/handle.rs b/base_layer/wallet/src/output_manager_service/handle.rs index 8b4ae008f7..367ce3d799 100644 --- a/base_layer/wallet/src/output_manager_service/handle.rs +++ b/base_layer/wallet/src/output_manager_service/handle.rs @@ -76,6 +76,7 @@ pub enum OutputManagerRequest { ScanForRecoverableOutputs(Vec), ScanOutputs(Vec), AddKnownOneSidedPaymentScript(KnownOneSidedPaymentScript), + ReinstateCancelledInboundTx(TxId), } impl fmt::Display for OutputManagerRequest { @@ -115,6 +116,7 @@ impl fmt::Display for OutputManagerRequest { ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"), ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"), AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"), + ReinstateCancelledInboundTx(_) => write!(f, "ReinstateCancelledInboundTx"), } } } @@ -149,6 +151,7 @@ pub enum OutputManagerResponse { RewoundOutputs(Vec), ScanOutputs(Vec), AddKnownOneSidedPaymentScript, + ReinstatedCancelledInboundTx, } pub type OutputManagerEventSender = broadcast::Sender>; @@ -545,4 +548,15 @@ impl OutputManagerHandle { _ => Err(OutputManagerError::UnexpectedApiResponse), } } + + pub async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> { + match self + .handle + .call(OutputManagerRequest::ReinstateCancelledInboundTx(tx_id)) + .await?? + { + OutputManagerResponse::ReinstatedCancelledInboundTx => Ok(()), + _ => Err(OutputManagerError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 94d9ef775b..d87e473185 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -40,6 +40,7 @@ use crate::{ types::{HashDigest, ValidationRetryStrategy}, }; use blake2::Digest; +use chrono::Utc; use diesel::result::{DatabaseErrorKind, Error as DieselError}; use futures::{pin_mut, StreamExt}; use log::*; @@ -344,6 +345,10 @@ where TBackend: OutputManagerBackend + 'static .add_known_script(known_script) .await .map(|_| OutputManagerResponse::AddKnownOneSidedPaymentScript), + OutputManagerRequest::ReinstateCancelledInboundTx(tx_id) => self + .reinstate_cancelled_inbound_transaction(tx_id) + .await + .map(|_| OutputManagerResponse::ReinstatedCancelledInboundTx), } } @@ -897,6 +902,27 @@ where TBackend: OutputManagerBackend + 'static Ok(self.resources.db.cancel_pending_transaction_outputs(tx_id).await?) } + /// Restore the pending transaction encumberance and output for an inbound transaction that was previously + /// cancelled. + async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> { + self.resources.db.reinstate_inbound_output(tx_id).await?; + + self.resources + .db + .add_pending_transaction_outputs(PendingTransactionOutputs { + tx_id, + outputs_to_be_spent: Vec::new(), + outputs_to_be_received: Vec::new(), + timestamp: Utc::now().naive_utc(), + coinbase_block_height: None, + }) + .await?; + + self.confirm_encumberance(tx_id).await?; + + Ok(()) + } + /// Go through the pending transaction and if any have existed longer than the specified duration, cancel them async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> { Ok(self.resources.db.timeout_pending_transaction_outputs(period).await?) diff --git a/base_layer/wallet/src/output_manager_service/storage/database.rs b/base_layer/wallet/src/output_manager_service/storage/database.rs index 9d21785c20..52d552e016 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database.rs @@ -23,7 +23,7 @@ use crate::output_manager_service::{ error::OutputManagerStorageError, service::Balance, - storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript}, + storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus}, TxId, }; use aes_gcm::Aes256Gcm; @@ -135,6 +135,7 @@ pub enum DbKey { KeyManagerState, InvalidOutputs, KnownOneSidedPaymentScripts, + OutputsByTxIdAndStatus(TxId, OutputStatus), } #[derive(Debug)] @@ -149,6 +150,7 @@ pub enum DbValue { KeyManagerState(KeyManagerState), KnownOneSidedPaymentScripts(Vec), AnyOutput(Box), + AnyOutputs(Vec), } pub enum DbKeyValuePair { @@ -158,6 +160,7 @@ pub enum DbKeyValuePair { PendingTransactionOutputs(TxId, Box), KeyManagerState(KeyManagerState), KnownOneSidedPaymentScripts(KnownOneSidedPaymentScript), + UpdateOutputStatus(Commitment, OutputStatus), } pub enum WriteOperation { @@ -710,6 +713,48 @@ where T: OutputManagerBackend + 'static .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) } + + /// Check if a single cancelled inbound output exists that matches this TxID, if it does then return its status to + /// EncumberedToBeReceived + pub async fn reinstate_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> { + let db_clone = self.db.clone(); + let outputs = tokio::task::spawn_blocking(move || { + match db_clone.fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) { + Ok(None) => Err(OutputManagerStorageError::ValueNotFound), + Ok(Some(DbValue::AnyOutputs(o))) => Ok(o), + Ok(Some(other)) => unexpected_result( + DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), + other, + ), + Err(e) => log_error(DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), e), + } + }) + .await + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string())) + .and_then(|inner_result| inner_result)?; + + if outputs.len() != 1 { + return Err(OutputManagerStorageError::UnexpectedResult( + "There should be only 1 output for a cancelled inbound transaction but more were found".to_string(), + )); + } + let db_clone2 = self.db.clone(); + + tokio::task::spawn_blocking(move || { + db_clone2.write(WriteOperation::Insert(DbKeyValuePair::UpdateOutputStatus( + outputs + .first() + .expect("Must be only one element in outputs") + .commitment + .clone(), + OutputStatus::EncumberedToBeReceived, + ))) + }) + .await + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } } fn unexpected_result(req: DbKey, res: DbValue) -> Result { @@ -734,6 +779,7 @@ impl Display for DbKey { DbKey::TimeLockedUnspentOutputs(_t) => f.write_str(&"Timelocked Outputs"), DbKey::KnownOneSidedPaymentScripts => f.write_str(&"Known claiming scripts"), DbKey::AnyOutputByCommitment(_) => f.write_str(&"AnyOutputByCommitment"), + DbKey::OutputsByTxIdAndStatus(_, _) => f.write_str(&"OutputsByTxIdAndStatus"), } } } @@ -751,6 +797,7 @@ impl Display for DbValue { DbValue::InvalidOutputs(_) => f.write_str("Invalid Outputs"), DbValue::KnownOneSidedPaymentScripts(_) => f.write_str(&"Known claiming scripts"), DbValue::AnyOutput(_) => f.write_str(&"Any Output"), + DbValue::AnyOutputs(_) => f.write_str(&"Any Outputs"), } } } diff --git a/base_layer/wallet/src/output_manager_service/storage/models.rs b/base_layer/wallet/src/output_manager_service/storage/models.rs index f276f1d2df..dd36eb6934 100644 --- a/base_layer/wallet/src/output_manager_service/storage/models.rs +++ b/base_layer/wallet/src/output_manager_service/storage/models.rs @@ -105,3 +105,14 @@ impl PartialEq for KnownOneSidedPaymentScript { self.script_hash == other.script_hash } } + +/// The status of a given output +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum OutputStatus { + Unspent, + Spent, + EncumberedToBeReceived, + EncumberedToBeSpent, + Invalid, + CancelledInbound, +} diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs index df5b2c89da..6ce5b4b80b 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs @@ -33,7 +33,7 @@ use crate::{ PendingTransactionOutputs, WriteOperation, }, - models::{DbUnblindedOutput, KnownOneSidedPaymentScript}, + models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus}, }, TxId, }, @@ -105,6 +105,7 @@ impl OutputManagerSqliteDatabase { } } impl OutputManagerBackend for OutputManagerSqliteDatabase { + #[allow(clippy::cognitive_complexity)] fn fetch(&self, key: &DbKey) -> Result, OutputManagerStorageError> { let conn = self.database_connection.acquire_lock(); @@ -135,6 +136,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { None }, }, + DbKey::AnyOutputByCommitment(commitment) => { match OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn)) { Ok(mut o) => { @@ -173,6 +175,18 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { None }, }, + DbKey::OutputsByTxIdAndStatus(tx_id, status) => { + let mut outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &(*conn))?; + for o in outputs.iter_mut() { + self.decrypt_if_necessary(o)?; + } + Some(DbValue::AnyOutputs( + outputs + .iter() + .map(|o| DbUnblindedOutput::try_from(o.clone())) + .collect::, _>>()?, + )) + }, DbKey::UnspentOutputs => { let mut outputs = OutputSql::index_status(OutputStatus::Unspent, &(*conn))?; for o in outputs.iter_mut() { @@ -273,6 +287,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { Ok(result) } + #[allow(clippy::cognitive_complexity)] fn write(&self, op: WriteOperation) -> Result, OutputManagerStorageError> { let conn = self.database_connection.acquire_lock(); @@ -337,6 +352,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { self.encrypt_if_necessary(&mut script_sql)?; script_sql.commit(&(*conn))? }, + DbKeyValuePair::UpdateOutputStatus(commitment, status) => { + let output = OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn))?; + output.update( + UpdateOutput { + status: Some(status), + tx_id: None, + spending_key: None, + script_private_key: None, + metadata_signature_nonce: None, + metadata_signature_u_key: None, + }, + &(*conn), + )?; + }, }, WriteOperation::Remove(k) => match k { DbKey::SpentOutput(s) => match OutputSql::find_status(&s.to_vec(), OutputStatus::Spent, &(*conn)) { @@ -409,6 +438,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { DbKey::InvalidOutputs => return Err(OutputManagerStorageError::OperationNotSupported), DbKey::TimeLockedUnspentOutputs(_) => return Err(OutputManagerStorageError::OperationNotSupported), DbKey::KnownOneSidedPaymentScripts => return Err(OutputManagerStorageError::OperationNotSupported), + DbKey::OutputsByTxIdAndStatus(_, _) => return Err(OutputManagerStorageError::OperationNotSupported), }, } @@ -840,17 +870,6 @@ fn pending_transaction_outputs_from_sql_outputs( }) } -/// The status of a given output -#[derive(PartialEq)] -enum OutputStatus { - Unspent, - Spent, - EncumberedToBeReceived, - EncumberedToBeSpent, - Invalid, - CancelledInbound, -} - impl TryFrom for OutputStatus { type Error = OutputManagerStorageError; @@ -1011,6 +1030,17 @@ impl OutputSql { Ok(request.first::(conn)?) } + pub fn find_by_tx_id_and_status( + tx_id: TxId, + status: OutputStatus, + conn: &SqliteConnection, + ) -> Result, OutputManagerStorageError> { + Ok(outputs::table + .filter(outputs::tx_id.eq(Some(tx_id as i64))) + .filter(outputs::status.eq(status as i32)) + .load(conn)?) + } + /// Find outputs via tx_id that are encumbered. Any outputs that are encumbered cannot be marked as spent. pub fn find_by_tx_id_and_encumbered( tx_id: TxId, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 2068d96eac..17cc84c7df 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -377,7 +377,11 @@ where "Handling Transaction Finalized Message, Trace: {}", msg.dht_header.message_tag.as_value() ); - let result = self.accept_finalized_transaction(origin_public_key, inner_msg, ).await; + let result = self.accept_finalized_transaction( + origin_public_key, + inner_msg, + &mut receive_transaction_protocol_handles, + ).await; match result { Err(TransactionServiceError::TransactionDoesNotExistError) => { @@ -1300,6 +1304,7 @@ where &mut self, source_pubkey: CommsPublicKey, finalized_transaction: proto::TransactionFinalizedMessage, + join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { let tx_id = finalized_transaction.tx_id; let transaction: Transaction = finalized_transaction @@ -1317,7 +1322,39 @@ where })?; let sender = match self.finalized_transaction_senders.get_mut(&tx_id) { - None => return Err(TransactionServiceError::TransactionDoesNotExistError), + None => { + // First check if perhaps we know about this inbound transaction but it was cancelled + match self.db.get_cancelled_pending_inbound_transaction(tx_id).await { + Ok(t) => { + if t.source_public_key != source_pubkey { + debug!( + target: LOG_TARGET, + "Received Finalized Transaction for a cancelled pending Inbound Transaction (TxId: \ + {}) but Source Public Key did not match", + tx_id + ); + return Err(TransactionServiceError::TransactionDoesNotExistError); + } + info!( + target: LOG_TARGET, + "Received Finalized Transaction for a cancelled pending Inbound Transaction (TxId: {}). \ + Restarting protocol", + tx_id + ); + self.db.uncancel_pending_transaction(tx_id).await?; + self.output_manager_service + .reinstate_cancelled_inbound_transaction(tx_id) + .await?; + + self.restart_receive_transaction_protocol(tx_id, source_pubkey.clone(), join_handles); + match self.finalized_transaction_senders.get_mut(&tx_id) { + None => return Err(TransactionServiceError::TransactionDoesNotExistError), + Some(s) => s, + } + }, + Err(_) => return Err(TransactionServiceError::TransactionDoesNotExistError), + } + }, Some(s) => s, }; @@ -1401,34 +1438,43 @@ where ) -> Result<(), TransactionServiceError> { let inbound_txs = self.db.get_pending_inbound_transactions().await?; for (tx_id, tx) in inbound_txs { - if !self.pending_transaction_reply_senders.contains_key(&tx_id) { - debug!( - target: LOG_TARGET, - "Restarting listening for Transaction Finalize for Pending Inbound Transaction TxId: {}", tx_id - ); - let (tx_finalized_sender, tx_finalized_receiver) = mpsc::channel(100); - let (cancellation_sender, cancellation_receiver) = oneshot::channel(); - self.finalized_transaction_senders.insert(tx_id, tx_finalized_sender); - self.receiver_transaction_cancellation_senders - .insert(tx_id, cancellation_sender); - let protocol = TransactionReceiveProtocol::new( - tx_id, - tx.source_public_key, - TransactionSenderMessage::None, - TransactionReceiveProtocolStage::WaitForFinalize, - self.resources.clone(), - tx_finalized_receiver, - cancellation_receiver, - ); - - let join_handle = tokio::spawn(protocol.execute()); - join_handles.push(join_handle); - } + self.restart_receive_transaction_protocol(tx_id, tx.source_public_key.clone(), join_handles); } Ok(()) } + fn restart_receive_transaction_protocol( + &mut self, + tx_id: TxId, + source_public_key: CommsPublicKey, + join_handles: &mut FuturesUnordered>>, + ) { + if !self.pending_transaction_reply_senders.contains_key(&tx_id) { + debug!( + target: LOG_TARGET, + "Restarting listening for Transaction Finalize for Pending Inbound Transaction TxId: {}", tx_id + ); + let (tx_finalized_sender, tx_finalized_receiver) = mpsc::channel(100); + let (cancellation_sender, cancellation_receiver) = oneshot::channel(); + self.finalized_transaction_senders.insert(tx_id, tx_finalized_sender); + self.receiver_transaction_cancellation_senders + .insert(tx_id, cancellation_sender); + let protocol = TransactionReceiveProtocol::new( + tx_id, + source_public_key, + TransactionSenderMessage::None, + TransactionReceiveProtocolStage::WaitForFinalize, + self.resources.clone(), + tx_finalized_receiver, + cancellation_receiver, + ); + + let join_handle = tokio::spawn(protocol.execute()); + join_handles.push(join_handle); + } + } + /// Add a base node public key to the list that will be used to broadcast transactions and monitor the base chain /// for the presence of spendable outputs. If this is the first time the base node public key is set do the initial /// mempool broadcast diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index b6844a7c44..df03327e19 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -90,8 +90,12 @@ pub trait TransactionBackend: Send + Sync + Clone { fn set_completed_transaction_validity(&self, tx_id: TxId, valid: bool) -> Result<(), TransactionStorageError>; /// Cancel Completed transaction, this will update the transaction status fn cancel_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError>; - /// Cancel Completed transaction, this will update the transaction status - fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError>; + /// Set cancellation on Pending transaction, this will update the transaction status + fn set_pending_transaction_cancellation_status( + &self, + tx_id: TxId, + cancelled: bool, + ) -> Result<(), TransactionStorageError>; /// Search all pending transaction for the provided tx_id and if it exists return the public key of the counterparty fn get_pending_transaction_counterparty_pub_key_by_tx_id( &self, @@ -563,7 +567,15 @@ where T: TransactionBackend + 'static pub async fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.cancel_pending_transaction(tx_id)) + tokio::task::spawn_blocking(move || db_clone.set_pending_transaction_cancellation_status(tx_id, true)) + .await + .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + Ok(()) + } + + pub async fn uncancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + let db_clone = self.db.clone(); + tokio::task::spawn_blocking(move || db_clone.set_pending_transaction_cancellation_status(tx_id, false)) .await .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; Ok(()) diff --git a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs index 841eb96510..f4e77e342d 100644 --- a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs @@ -576,16 +576,20 @@ impl TransactionBackend for TransactionServiceSqliteDatabase { Ok(()) } - fn cancel_pending_transaction(&self, tx_id: u64) -> Result<(), TransactionStorageError> { + fn set_pending_transaction_cancellation_status( + &self, + tx_id: u64, + cancelled: bool, + ) -> Result<(), TransactionStorageError> { let conn = self.database_connection.acquire_lock(); - match InboundTransactionSql::find_by_cancelled(tx_id, false, &(*conn)) { + match InboundTransactionSql::find(tx_id, &(*conn)) { Ok(v) => { - v.cancel(&(*conn))?; + v.set_cancelled(cancelled, &(*conn))?; }, Err(_) => { - match OutboundTransactionSql::find_by_cancelled(tx_id, false, &(*conn)) { + match OutboundTransactionSql::find(tx_id, &(*conn)) { Ok(v) => { - v.cancel(&(*conn))?; + v.set_cancelled(cancelled, &(*conn))?; }, Err(TransactionStorageError::DieselError(DieselError::NotFound)) => { return Err(TransactionStorageError::ValuesNotFound); @@ -1019,10 +1023,10 @@ impl InboundTransactionSql { Ok(()) } - pub fn cancel(&self, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { + pub fn set_cancelled(&self, cancelled: bool, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { self.update( UpdateInboundTransactionSql { - cancelled: Some(1i32), + cancelled: Some(cancelled as i32), direct_send_success: None, receiver_protocol: None, send_count: None, @@ -1202,10 +1206,10 @@ impl OutboundTransactionSql { Ok(()) } - pub fn cancel(&self, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { + pub fn set_cancelled(&self, cancelled: bool, conn: &SqliteConnection) -> Result<(), TransactionStorageError> { self.update( UpdateOutboundTransactionSql { - cancelled: Some(1i32), + cancelled: Some(cancelled as i32), direct_send_success: None, sender_protocol: None, send_count: None, @@ -1986,23 +1990,34 @@ mod test { assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_err()); InboundTransactionSql::try_from(inbound_tx1.clone()) .unwrap() - .cancel(&conn) + .set_cancelled(true, &conn) .unwrap(); assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, false, &conn).is_err()); assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_ok()); - + InboundTransactionSql::try_from(inbound_tx1.clone()) + .unwrap() + .set_cancelled(false, &conn) + .unwrap(); + assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_err()); + assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, false, &conn).is_ok()); OutboundTransactionSql::try_from(outbound_tx1.clone()) .unwrap() .commit(&conn) .unwrap(); assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, true, &conn).is_err()); - OutboundTransactionSql::try_from(outbound_tx1) + OutboundTransactionSql::try_from(outbound_tx1.clone()) .unwrap() - .cancel(&conn) + .set_cancelled(true, &conn) .unwrap(); - assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, false, &conn).is_err()); - assert!(InboundTransactionSql::find_by_cancelled(inbound_tx1.tx_id, true, &conn).is_ok()); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, false, &conn).is_err()); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, true, &conn).is_ok()); + OutboundTransactionSql::try_from(outbound_tx1.clone()) + .unwrap() + .set_cancelled(false, &conn) + .unwrap(); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, true, &conn).is_err()); + assert!(OutboundTransactionSql::find_by_cancelled(outbound_tx1.tx_id, false, &conn).is_ok()); CompletedTransactionSql::try_from(completed_tx1.clone()) .unwrap() diff --git a/base_layer/wallet/tests/output_manager_service/service.rs b/base_layer/wallet/tests/output_manager_service/service.rs index 71ec0d75d4..725e48bbaa 100644 --- a/base_layer/wallet/tests/output_manager_service/service.rs +++ b/base_layer/wallet/tests/output_manager_service/service.rs @@ -84,6 +84,7 @@ use tari_wallet::{ types::ValidationRetryStrategy, }; +use tari_wallet::output_manager_service::storage::models::OutputStatus; use tokio::{ runtime::Runtime, sync::{broadcast, broadcast::channel}, @@ -860,6 +861,59 @@ fn cancel_transaction() { assert_eq!(runtime.block_on(oms.get_unspent_outputs()).unwrap().len(), num_outputs); } +#[test] +fn cancel_transaction_and_reinstate_inbound_tx() { + let mut runtime = Runtime::new().unwrap(); + + let (connection, _tempdir) = get_temp_sqlite_database_connection(); + let backend = OutputManagerSqliteDatabase::new(connection, None); + + let (mut oms, _shutdown, _, _, _, _, _) = setup_output_manager_service(&mut runtime, backend.clone(), true); + + let value = MicroTari::from(5000); + let (tx_id, sender_message) = generate_sender_transaction_message(value); + let _rtp = runtime.block_on(oms.get_recipient_transaction(sender_message)).unwrap(); + assert_eq!(runtime.block_on(oms.get_unspent_outputs()).unwrap().len(), 0); + + let pending_txs = runtime.block_on(oms.get_pending_transactions()).unwrap(); + + assert_eq!(pending_txs.len(), 1); + + let output = pending_txs + .get(&tx_id) + .unwrap() + .outputs_to_be_received + .first() + .unwrap() + .clone(); + + runtime.block_on(oms.cancel_transaction(tx_id)).unwrap(); + + let cancelled_output = backend + .fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) + .unwrap() + .unwrap(); + + if let DbValue::AnyOutputs(o) = cancelled_output { + let o = o.first().expect("Should be one output in here"); + assert_eq!(o.commitment, output.commitment); + } else { + panic!("Should have found cancelled output"); + } + + assert_eq!(runtime.block_on(oms.get_pending_transactions()).unwrap().len(), 0); + + runtime + .block_on(oms.reinstate_cancelled_inbound_transaction(tx_id)) + .unwrap(); + + assert_eq!(runtime.block_on(oms.get_pending_transactions()).unwrap().len(), 1); + + let balance = runtime.block_on(oms.get_balance()).unwrap(); + + assert_eq!(balance.pending_incoming_balance, value); +} + #[test] fn timeout_transaction() { let factories = CryptoFactories::default(); diff --git a/clients/wallet_grpc_client/index.js b/clients/wallet_grpc_client/index.js index 15347770d2..d638ba1f5c 100644 --- a/clients/wallet_grpc_client/index.js +++ b/clients/wallet_grpc_client/index.js @@ -39,6 +39,7 @@ function Client(address) { "importUtxos", "listConnectedPeers", "getNetworkStatus", + "cancelTransaction", ]; this.waitForReady = (...args) => { diff --git a/integration_tests/features/Propagation.feature b/integration_tests/features/Propagation.feature index 339b9afc72..b1cc97eaeb 100644 --- a/integration_tests/features/Propagation.feature +++ b/integration_tests/features/Propagation.feature @@ -81,7 +81,7 @@ Feature: Block Propagation When I wait 10 seconds And mining node MINER mines 5 blocks When I wait 100 seconds - When I start LAG1 + When I start base node LAG1 # Wait for node to so start and get into listening mode When I wait 100 seconds Then node MINER is at height 6 diff --git a/integration_tests/features/Reorgs.feature b/integration_tests/features/Reorgs.feature index 415cd42e04..218201b2a0 100644 --- a/integration_tests/features/Reorgs.feature +++ b/integration_tests/features/Reorgs.feature @@ -57,10 +57,10 @@ Feature: Reorgs When mining node MINING1 mines 3 blocks with min difficulty 1 and max difficulty 20 And node NODE1 is at height 17 And I stop node NODE1 - And I start PNODE2 + And I start base node PNODE2 When mining node MINING2 mines 6 blocks with min difficulty 20 and max difficulty 1000000 And node PNODE2 is at height 20 - When I start NODE1 + When I start base node NODE1 Then all nodes are at height 20 @critical @reorg @@ -77,7 +77,7 @@ Feature: Reorgs And mining node MINING2 mines 19 blocks with min difficulty 20 and max difficulty 1000000 And node NODE2 is at height 20 And I stop node NODE2 - When I start NODE1 + When I start base node NODE1 And mining node MINING1 mines 3 blocks with min difficulty 1 and max difficulty 20 And node NODE1 is at height 4 When I create a transaction TX1 spending CB1 to UTX1 @@ -87,7 +87,7 @@ Feature: Reorgs And node NODE1 is at height 10 Given I have a pruned node PNODE1 connected to node NODE1 with pruning horizon set to 5 Then node PNODE1 is at height 10 - When I start NODE2 + When I start base node NODE2 Then all nodes are at height 20 # Because TX1 should have been re_orged out we should be able to spend CB1 again When I create a transaction TX2 spending CB1 to UTX2 diff --git a/integration_tests/features/Sync.feature b/integration_tests/features/Sync.feature index 9428c34c01..497a59e6f6 100644 --- a/integration_tests/features/Sync.feature +++ b/integration_tests/features/Sync.feature @@ -59,10 +59,10 @@ Feature: Block Sync And mining node MINER1 mines 5 blocks with min difficulty 1 and max difficulty 1 Then node NODE1 is at height 10 Given I stop node NODE1 - And I start NODE2 + And I start base node NODE2 And mining node MINER2 mines 7 blocks with min difficulty 11 and max difficulty 100000 Then node NODE2 is at height 12 - When I start NODE1 + When I start base node NODE1 Then all nodes are on the same chain at height 12 @critical @reorg @long-running @@ -79,10 +79,10 @@ Feature: Block Sync And mining node MINER1 mines 1001 blocks with min difficulty 1 and max difficulty 10 Then node NODE1 is at height 1006 Given I stop node NODE1 - And I start NODE2 + And I start base node NODE2 And mining node MINER2 mines 1500 blocks with min difficulty 11 and max difficulty 100000 Then node NODE2 is at height 1505 - When I start NODE1 + When I start base node NODE1 Then all nodes are on the same chain at height 1505 @critical @@ -114,7 +114,7 @@ Feature: Block Sync When I mine 5 blocks on NODE2 Then node NODE2 is at height 5 Then node PNODE2 is at height 40 - When I start NODE1 + When I start base node NODE1 # We need for node to boot up and supply node 2 with blocks And I connect node NODE2 to node NODE1 and wait 60 seconds Then all nodes are at height 40 @@ -129,7 +129,7 @@ Feature: Block Sync And I stop node SYNCER When mining node MINER mines blocks with min difficulty 1 and max difficulty 9999999999 Then node SEED is at height - When I start SYNCER + When I start base node SYNCER # Try to mine much faster than block sync, but still producing a lower accumulated difficulty And mining node MINER2 mines blocks with min difficulty 1 and max difficulty 10 # Allow reorg to filter through diff --git a/integration_tests/features/WalletTransactions.feature b/integration_tests/features/WalletTransactions.feature index 4f40a3e317..44f9eb6121 100644 --- a/integration_tests/features/WalletTransactions.feature +++ b/integration_tests/features/WalletTransactions.feature @@ -161,4 +161,44 @@ Feature: Wallet Transactions Then I wait for wallet WALLET_B to have at least 500000 uT Then I check if wallet WALLET_B has 5 transactions Then I restart wallet WALLET_B - Then I check if wallet WALLET_B has 5 transactions \ No newline at end of file + Then I check if wallet WALLET_B has 5 transactions + + @critical + Scenario: Wallet SAF negotiation and cancellation with offline peers + Given I have a seed node NODE + And I have 1 base nodes connected to all seed nodes + And I have wallet WALLET_A connected to all seed nodes + And I have mining node MINER connected to base node NODE and wallet WALLET_A + And mining node MINER mines 5 blocks + Then all nodes are at height 5 + Then I wait for wallet WALLET_A to have at least 10000000000 uT + And I have non-default wallet WALLET_SENDER connected to all seed nodes using StoreAndForwardOnly + And I send 100000000 uT from wallet WALLET_A to wallet WALLET_SENDER at fee 100 + When wallet WALLET_SENDER detects all transactions are at least Broadcast + And mining node MINER mines 5 blocks + Then all nodes are at height 10 + Then I wait for wallet WALLET_SENDER to have at least 100000000 uT + And I have wallet WALLET_RECV connected to all seed nodes + And I stop wallet WALLET_RECV + And I send 1000000 uT from wallet WALLET_SENDER to wallet WALLET_RECV at fee 100 + When wallet WALLET_SENDER detects last transaction is Pending + Then I stop wallet WALLET_SENDER + And I start wallet WALLET_RECV + And I wait for 5 seconds + When wallet WALLET_RECV detects all transactions are at least Pending + Then I cancel last transaction in wallet WALLET_RECV + Then I stop wallet WALLET_RECV + And I start wallet WALLET_SENDER + # This is a weirdness that I haven't been able to figure out. When you start WALLET_SENDER on the line above it + # requests SAF messages from the base nodes the base nodes get the request and attempt to send the stored messages + # but the connection fails. It requires a second reconnection and request for the SAF messages to be delivered. + And I wait for 10 seconds + Then I restart wallet WALLET_SENDER + When wallet WALLET_SENDER detects all transactions are at least Broadcast + And mining node MINER mines 5 blocks + Then all nodes are at height 15 + When wallet WALLET_SENDER detects all transactions as Mined_Confirmed + And I start wallet WALLET_RECV + And I wait for 10 seconds + Then I restart wallet WALLET_RECV + Then I wait for wallet WALLET_RECV to have at least 1000000 uT \ No newline at end of file diff --git a/integration_tests/features/support/steps.js b/integration_tests/features/support/steps.js index 166a392368..b517831c41 100644 --- a/integration_tests/features/support/steps.js +++ b/integration_tests/features/support/steps.js @@ -280,6 +280,9 @@ Given( wallet.setPeerSeeds([this.seeds[seedName].peerAddress()]); await wallet.startNew(); this.addWallet(walletName, wallet); + let walletClient = await this.getWallet(walletName).connectClient(); + let walletInfo = await walletClient.identify(); + this.addWalletPubkey(walletName, walletInfo.public_key); } ); @@ -296,6 +299,9 @@ Given( wallet.setPeerSeeds([this.seedAddresses()]); await wallet.startNew(); this.addWallet(name, wallet); + let walletClient = await this.getWallet(name).connectClient(); + let walletInfo = await walletClient.identify(); + this.addWalletPubkey(name, walletInfo.public_key); } ); @@ -344,6 +350,9 @@ Given( wallet.setPeerSeeds([this.seedAddresses()]); await wallet.startNew(); this.addWallet(name, wallet); + let walletClient = await this.getWallet(name).connectClient(); + let walletInfo = await walletClient.identify(); + this.addWalletPubkey(name, walletInfo.public_key); } ); @@ -354,35 +363,12 @@ Given( // mechanism: DirectOnly, StoreAndForwardOnly, DirectAndStoreAndForward const promises = []; for (let i = 0; i < n; i++) { - if (i < 10) { - const wallet = new WalletProcess( - "Wallet_0" + String(i), - false, - { routingMechanism: mechanism }, - this.logFilePathWallet - ); - console.log(wallet.name, wallet.options); - wallet.setPeerSeeds([this.seedAddresses()]); - promises.push( - wallet - .startNew() - .then(() => this.addWallet("Wallet_0" + String(i), wallet)) - ); - } else { - const wallet = new WalletProcess( - "Wallet_0" + String(i), - false, - { routingMechanism: mechanism }, - this.logFilePathWallet - ); - console.log(wallet.name, wallet.options); - wallet.setPeerSeeds([this.seedAddresses()]); - promises.push( - wallet - .startNew() - .then(() => this.addWallet("Wallet_" + String(i), wallet)) - ); - } + let name = "Wallet_" + String(n).padStart(2, "0"); + promises.push( + this.createAndAddWallet(name, [this.seedAddresses()], { + routingMechanism: mechanism, + }) + ); } await Promise.all(promises); } @@ -409,8 +395,11 @@ Given( seedWords ); walletB.setPeerSeeds([this.seedAddresses()]); - walletB.startNew(); // Do not 'await' here + await walletB.startNew(); this.addWallet(walletNameB, walletB); + let walletClient = await this.getWallet(walletNameB).connectClient(); + let walletInfo = await walletClient.identify(); + this.addWalletPubkey(walletNameB, walletInfo.public_key); } ); @@ -419,6 +408,11 @@ When(/I stop wallet (.*)/, async function (walletName) { await wallet.stop(); }); +When(/I start wallet (.*)/, async function (walletName) { + let wallet = this.getWallet(walletName); + await wallet.start(); +}); + When(/I restart wallet (.*)/, async function (walletName) { let wallet = this.getWallet(walletName); await wallet.stop(); @@ -700,7 +694,7 @@ Then("Proxy response for block header by hash is valid", function () { assert(lastResult.result.status, "OK"); }); -When(/I start (.*)/, { timeout: 20 * 1000 }, async function (name) { +When(/I start base node (.*)/, { timeout: 20 * 1000 }, async function (name) { await this.startNode(name); }); @@ -1423,7 +1417,8 @@ Then( async function send_tari( sourceWallet, - destWallet, + destWalletName, + destWalletPubkey, tariAmount, feePerGram, oneSided = false, @@ -1431,21 +1426,18 @@ async function send_tari( printMessage = true ) { const sourceWalletClient = await sourceWallet.connectClient(); - const destClient = await destWallet.connectClient(); - const destInfo = await destClient.identify(); - if (message === "") { - message = - sourceWallet.name + + console.log( + sourceWallet.name + " sending " + tariAmount + "uT one-sided(" + oneSided + ") to " + - destWallet.name + + destWalletName + " `" + - destInfo.public_key + - "`"; - } + destWalletPubkey + + "`" + ); if (printMessage) { console.log(message); } @@ -1461,7 +1453,7 @@ async function send_tari( lastResult = await sourceWalletClient.transfer({ recipients: [ { - address: destInfo.public_key, + address: destWalletPubkey, amount: tariAmount, fee_per_gram: feePerGram, message: message, @@ -1472,7 +1464,7 @@ async function send_tari( lastResult = await sourceWalletClient.transfer({ recipients: [ { - address: destInfo.public_key, + address: destWalletPubkey, amount: tariAmount, fee_per_gram: feePerGram, message: message, @@ -1521,12 +1513,12 @@ When( const sourceClient = await sourceWallet.connectClient(); const sourceInfo = await sourceClient.identify(); - const destWallet = this.getWallet(dest); - const destClient = await destWallet.connectClient(); - const destInfo = await destClient.identify(); + const destPublicKey = this.getWalletPubkey(dest); + this.lastResult = await send_tari( sourceWallet, - destWallet, + dest, + destPublicKey, tariAmount, feePerGram ); @@ -1536,7 +1528,7 @@ When( this.lastResult.results[0].transaction_id ); this.addTransaction( - destInfo.public_key, + destPublicKey, this.lastResult.results[0].transaction_id ); console.log( @@ -1561,7 +1553,8 @@ When( for (let i = 0; i < number; i++) { this.lastResult = await send_tari( this.getWallet(source), - this.getWallet(dest), + destInfo.name, + destInfo.public_key, tariAmount, fee ); @@ -1595,7 +1588,8 @@ When( const destInfo = await destClient.identify(); this.lastResult = await send_tari( this.getWallet(source), - this.getWallet(wallet), + destInfo.name, + destInfo.public_key, tariAmount, fee ); @@ -1718,7 +1712,8 @@ When( const sourceInfo = await sourceClient.identify(); this.lastResult = await send_tari( this.getWallet(source), - this.getWallet(source), + sourceInfo.name, + sourceInfo.public_key, tariAmount, feePerGram ); @@ -1779,19 +1774,23 @@ When( /I send a one-sided transaction of (.*) uT from (.*) to (.*) at fee (.*)/, { timeout: 65 * 1000 }, async function (amount, source, dest, feePerGram) { - let wallet = this.getWallet(source); - let sourceClient = await wallet.connectClient(); + const sourceWallet = this.getWallet(source); + const sourceClient = await sourceWallet.connectClient(); + const sourceInfo = await sourceClient.identify(); + + const destPublicKey = this.getWalletPubkey(dest); const oneSided = true; const lastResult = await send_tari( - this.getWallet(source), - this.getWallet(dest), + sourceWallet, + dest, + destPublicKey, amount, feePerGram, oneSided ); expect(lastResult.results[0].is_success).to.equal(true); - const sourceInfo = await sourceClient.identify(); + this.addTransaction( sourceInfo.public_key, lastResult.results[0].transaction_id @@ -1799,6 +1798,31 @@ When( } ); +When( + /I cancel last transaction in wallet (.*)/, + { timeout: 25 * 5 * 1000 }, + async function (walletName) { + const wallet = this.getWallet(walletName); + const walletClient = await wallet.connectClient(); + + let lastTxId = this.lastResult.results[0].transaction_id; + console.log( + "Attempting to cancel transaction ", + lastTxId, + "from wallet", + walletName + ); + + let result = await walletClient.cancelTransaction(lastTxId); + console.log( + "Cancellation successful? ", + result.success, + result.failure_message + ); + assert(result.success, true); + } +); + When(/I wait (.*) seconds/, { timeout: 600 * 1000 }, async function (int) { console.log("Waiting for", int, "seconds"); await sleep(int * 1000); @@ -1960,6 +1984,36 @@ Then( } ); +Then( + /wallet (.*) detects last transaction is Pending/, + { timeout: 3800 * 1000 }, + async function (walletName) { + const wallet = this.getWallet(walletName); + const walletClient = await wallet.connectClient(); + + let lastTxId = this.lastResult.results[0].transaction_id; + console.log( + "Waiting for Transaction ", + lastTxId, + "to be pending in wallet", + walletName + ); + + await waitFor( + async () => walletClient.isTransactionPending(lastTxId), + true, + 3700 * 1000, + 5 * 1000, + 5 + ); + const transactionPending = await walletClient.isTransactionPending( + lastTxId + ); + + expect(transactionPending).to.equal(true); + } +); + Then( /wallet (.*) detects all transactions are at least Completed/, { timeout: 1200 * 1000 }, @@ -2869,7 +2923,8 @@ When( for (let i = 0; i < numTransactions; i++) { const result = await send_tari( this.getWallet(sourceWallet), - this.getWallet(destWallet), + destInfo.name, + destInfo.public_key, amount, feePerGram, false, diff --git a/integration_tests/features/support/world.js b/integration_tests/features/support/world.js index 720599eb7d..b23cd1413e 100644 --- a/integration_tests/features/support/world.js +++ b/integration_tests/features/support/world.js @@ -88,8 +88,13 @@ class CustomWorld { this.proxies[name] = process; } - async createAndAddWallet(name, nodeAddresses) { - const wallet = new WalletProcess(name, false, {}, this.logFilePathWallet); + async createAndAddWallet(name, nodeAddresses, options = {}) { + const wallet = new WalletProcess( + name, + false, + options, + this.logFilePathWallet + ); wallet.setPeerSeeds([nodeAddresses]); await wallet.startNew(); @@ -103,6 +108,10 @@ class CustomWorld { this.wallets[name] = process; } + addWalletPubkey(name, pubkey) { + this.walletPubkeys[name] = pubkey; + } + addOutput(name, output) { this.outputs[name] = output; } diff --git a/integration_tests/helpers/walletClient.js b/integration_tests/helpers/walletClient.js index 0940f8628e..f3a38ff35a 100644 --- a/integration_tests/helpers/walletClient.js +++ b/integration_tests/helpers/walletClient.js @@ -222,6 +222,25 @@ class WalletClient { } } + async isTransactionPending(tx_id) { + try { + const txnDetails = await this.getTransactionInfo({ + transaction_ids: [tx_id.toString()], + }); + if ( + transactionStatus().indexOf(txnDetails.transactions[0].status) == 2 && + txnDetails.transactions[0].valid + ) { + return true; + } else { + return false; + } + } catch (err) { + // Any error here must be treated as if the required status was not achieved + return false; + } + } + async isTransactionAtLeastCompleted(tx_id) { try { const txnDetails = await this.getTransactionInfo({ @@ -352,6 +371,22 @@ class WalletClient { num_node_connections: +resp.num_node_connections, }; } + async cancelTransaction(tx_id) { + try { + const result = await this.client.cancelTransaction({ + tx_id: tx_id, + }); + return { + success: result.is_success, + failure_message: result.failure_message, + }; + } catch (err) { + return { + success: false, + failure_message: err, + }; + } + } } module.exports = WalletClient;