From 4cbdfec945857c5b7a334962e137d2c8dc4d4c4a Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Mon, 13 Nov 2023 15:40:15 +0200 Subject: [PATCH] feat: sender and receiver protocols use bytes (not hex string) in wallet database (#5950) Description --- Changed the sender and receiver protocol in the wallet database to use bytes instead of a hex string, as the underlying data type is encrypted bytes. This issue was highlighted due to the to_hex function in tari_utilities not being able to convert large transactions into hex strings (returned **String to large**) for saving in the wallet database. Motivation and Context --- See above. How Has This Been Tested? --- Existing unit tests and cucumber tests passed Added a new integration-level unit test `async fn large_interactive_transaction()` to test the conversion from pending outgoing and incoming transactions to and from the database. What process can a PR reviewer use to test or verify this change? --- Code walk-through Run the new unit test Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify Co-authored-by: SW van Heerden --- .../down.sql | 1 + .../up.sql | 36 ++++ base_layer/wallet/src/schema.rs | 26 +-- .../wallet/src/transaction_service/error.rs | 2 + .../transaction_service/storage/sqlite_db.rs | 69 +++---- .../transaction_service_tests/service.rs | 176 ++++++++++++++++++ 6 files changed, 252 insertions(+), 58 deletions(-) create mode 100644 base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/down.sql create mode 100644 base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/up.sql diff --git a/base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/down.sql b/base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/down.sql new file mode 100644 index 0000000000..291a97c5ce --- /dev/null +++ b/base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` \ No newline at end of file diff --git a/base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/up.sql b/base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/up.sql new file mode 100644 index 0000000000..da9aadf29a --- /dev/null +++ b/base_layer/wallet/migrations/2023-11-13-082000_transaction_protocols/up.sql @@ -0,0 +1,36 @@ +-- Any old 'inbound_transactions' will not be valid due to the change in 'receiver_protocol' to 'BLOB', so we drop and +-- recreate the table. + +DROP TABLE inbound_transactions; +CREATE TABLE inbound_transactions +( + tx_id BIGINT PRIMARY KEY NOT NULL, + source_address BLOB NOT NULL, + amount BIGINT NOT NULL, + receiver_protocol BLOB NOT NULL, + message TEXT NOT NULL, + timestamp DATETIME NOT NULL, + cancelled INTEGER NOT NULL, + direct_send_success INTEGER DEFAULT 0 NOT NULL, + send_count INTEGER DEFAULT 0 NOT NULL, + last_send_timestamp DATETIME NULL +); + +-- Any old 'outbound_transactions' will not be valid due to the change in 'sender_protocol' to 'BLOB', so we drop and +-- recreate the table. + +DROP TABLE outbound_transactions; +CREATE TABLE outbound_transactions +( + tx_id BIGINT PRIMARY KEY NOT NULL, + destination_address BLOB NOT NULL, + amount BIGINT NOT NULL, + fee BIGINT NOT NULL, + sender_protocol BLOB NOT NULL, + message TEXT NOT NULL, + timestamp DATETIME NOT NULL, + cancelled INTEGER DEFAULT 0 NOT NULL, + direct_send_success INTEGER DEFAULT 0 NOT NULL, + send_count INTEGER DEFAULT 0 NOT NULL, + last_send_timestamp DATETIME NULL +); diff --git a/base_layer/wallet/src/schema.rs b/base_layer/wallet/src/schema.rs index c7f8294804..5d734c2795 100644 --- a/base_layer/wallet/src/schema.rs +++ b/base_layer/wallet/src/schema.rs @@ -1,4 +1,6 @@ -table! { +// @generated automatically by Diesel CLI. + +diesel::table! { burnt_proofs (id) { id -> Integer, reciprocal_claim_public_key -> Text, @@ -7,14 +9,14 @@ table! { } } -table! { +diesel::table! { client_key_values (key) { key -> Text, value -> Text, } } -table! { +diesel::table! { completed_transactions (tx_id) { tx_id -> BigInt, source_address -> Binary, @@ -39,12 +41,12 @@ table! { } } -table! { +diesel::table! { inbound_transactions (tx_id) { tx_id -> BigInt, source_address -> Binary, amount -> BigInt, - receiver_protocol -> Text, + receiver_protocol -> Binary, message -> Text, timestamp -> Timestamp, cancelled -> Integer, @@ -54,7 +56,7 @@ table! { } } -table! { +diesel::table! { known_one_sided_payment_scripts (script_hash) { script_hash -> Binary, private_key -> Text, @@ -64,13 +66,13 @@ table! { } } -table! { +diesel::table! { outbound_transactions (tx_id) { tx_id -> BigInt, destination_address -> Binary, amount -> BigInt, fee -> BigInt, - sender_protocol -> Text, + sender_protocol -> Binary, message -> Text, timestamp -> Timestamp, cancelled -> Integer, @@ -80,7 +82,7 @@ table! { } } -table! { +diesel::table! { outputs (id) { id -> Integer, commitment -> Binary, @@ -120,7 +122,7 @@ table! { } } -table! { +diesel::table! { scanned_blocks (header_hash) { header_hash -> Binary, height -> BigInt, @@ -130,14 +132,14 @@ table! { } } -table! { +diesel::table! { wallet_settings (key) { key -> Text, value -> Text, } } -allow_tables_to_appear_in_same_query!( +diesel::allow_tables_to_appear_in_same_query!( burnt_proofs, client_key_values, completed_transactions, diff --git a/base_layer/wallet/src/transaction_service/error.rs b/base_layer/wallet/src/transaction_service/error.rs index b69968d7c2..c7f4bf10c0 100644 --- a/base_layer/wallet/src/transaction_service/error.rs +++ b/base_layer/wallet/src/transaction_service/error.rs @@ -231,6 +231,8 @@ pub enum TransactionStorageError { UnexpectedResult(String), #[error("Bincode error: `{0}`")] BincodeSerialize(String), + #[error("Bincode error: `{0}`")] + BincodeDeserialize(String), #[error("This write operation is not supported for provided DbKey")] OperationNotSupported, #[error("Could not find all values specified for batch operation")] diff --git a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs index 130082073a..30e17796d2 100644 --- a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs @@ -23,7 +23,6 @@ use std::{ collections::HashMap, convert::{TryFrom, TryInto}, - str::from_utf8, sync::{Arc, RwLock}, }; @@ -45,11 +44,7 @@ use tari_common_types::{ types::{BlockHash, PrivateKey, PublicKey, Signature}, }; use tari_core::transactions::tari_amount::MicroMinotari; -use tari_utilities::{ - hex::{from_hex, Hex}, - ByteArray, - Hidden, -}; +use tari_utilities::{ByteArray, Hidden}; use thiserror::Error; use tokio::time::Instant; use zeroize::Zeroize; @@ -1166,7 +1161,7 @@ struct InboundTransactionSql { tx_id: i64, source_address: Vec, amount: i64, - receiver_protocol: String, + receiver_protocol: Vec, message: String, timestamp: NaiveDateTime, cancelled: i32, @@ -1345,11 +1340,13 @@ impl InboundTransactionSql { } fn try_from(i: InboundTransaction, cipher: &XChaCha20Poly1305) -> Result { + let receiver_protocol_bytes = bincode::serialize(&i.receiver_protocol) + .map_err(|e| TransactionStorageError::BincodeSerialize(e.to_string()))?; let i = Self { tx_id: i.tx_id.as_u64() as i64, source_address: i.source_address.to_bytes().to_vec(), amount: u64::from(i.amount) as i64, - receiver_protocol: serde_json::to_string(&i.receiver_protocol)?, + receiver_protocol: receiver_protocol_bytes.to_vec(), message: i.message, timestamp: i.timestamp, cancelled: i32::from(i.cancelled), @@ -1376,26 +1373,15 @@ impl Encryptable for InboundTransactionSql { self.receiver_protocol = encrypt_bytes_integral_nonce( cipher, self.domain("receiver_protocol"), - Hidden::hide(self.receiver_protocol.as_bytes().to_vec()), - )? - .to_hex(); + Hidden::hide(self.receiver_protocol), + )?; Ok(self) } fn decrypt(mut self, cipher: &XChaCha20Poly1305) -> Result { - let mut decrypted_protocol = decrypt_bytes_integral_nonce( - cipher, - self.domain("receiver_protocol"), - &from_hex(self.receiver_protocol.as_str()).map_err(|e| e.to_string())?, - )?; - - self.receiver_protocol = from_utf8(decrypted_protocol.as_slice()) - .map_err(|e| e.to_string())? - .to_string(); - - // zeroize sensitive data - decrypted_protocol.zeroize(); + self.receiver_protocol = + decrypt_bytes_integral_nonce(cipher, self.domain("receiver_protocol"), &self.receiver_protocol)?; Ok(self) } @@ -1408,7 +1394,8 @@ impl InboundTransaction { tx_id: (i.tx_id as u64).into(), source_address: TariAddress::from_bytes(&i.source_address).map_err(TransactionKeyError::Source)?, amount: MicroMinotari::from(i.amount as u64), - receiver_protocol: serde_json::from_str(&i.receiver_protocol.clone())?, + receiver_protocol: bincode::deserialize(&i.receiver_protocol) + .map_err(|e| TransactionStorageError::BincodeDeserialize(e.to_string()))?, status: TransactionStatus::Pending, message: i.message, timestamp: i.timestamp, @@ -1425,7 +1412,7 @@ impl InboundTransaction { pub struct UpdateInboundTransactionSql { cancelled: Option, direct_send_success: Option, - receiver_protocol: Option, + receiver_protocol: Option>, send_count: Option, last_send_timestamp: Option>, } @@ -1438,7 +1425,7 @@ struct OutboundTransactionSql { destination_address: Vec, amount: i64, fee: i64, - sender_protocol: String, + sender_protocol: Vec, message: String, timestamp: NaiveDateTime, cancelled: i32, @@ -1601,12 +1588,14 @@ impl OutboundTransactionSql { } fn try_from(o: OutboundTransaction, cipher: &XChaCha20Poly1305) -> Result { + let sender_protocol_bytes = bincode::serialize(&o.sender_protocol) + .map_err(|e| TransactionStorageError::BincodeSerialize(e.to_string()))?; let outbound_tx = Self { tx_id: o.tx_id.as_u64() as i64, destination_address: o.destination_address.to_bytes().to_vec(), amount: u64::from(o.amount) as i64, fee: u64::from(o.fee) as i64, - sender_protocol: serde_json::to_string(&o.sender_protocol)?, + sender_protocol: sender_protocol_bytes.to_vec(), message: o.message, timestamp: o.timestamp, cancelled: i32::from(o.cancelled), @@ -1634,27 +1623,15 @@ impl Encryptable for OutboundTransactionSql { self.sender_protocol = encrypt_bytes_integral_nonce( cipher, self.domain("sender_protocol"), - Hidden::hide(self.sender_protocol.as_bytes().to_vec()), - )? - .to_hex(); + Hidden::hide(self.sender_protocol), + )?; Ok(self) } fn decrypt(mut self, cipher: &XChaCha20Poly1305) -> Result { - let mut decrypted_protocol = decrypt_bytes_integral_nonce( - cipher, - self.domain("sender_protocol"), - &from_hex(self.sender_protocol.as_str()).map_err(|e| e.to_string())?, - )?; - - self.sender_protocol = from_utf8(decrypted_protocol.as_slice()) - .map_err(|e| e.to_string())? - .to_string(); - - // zeroize sensitive data - decrypted_protocol.zeroize(); - + self.sender_protocol = + decrypt_bytes_integral_nonce(cipher, self.domain("sender_protocol"), &self.sender_protocol)?; Ok(self) } } @@ -1662,14 +1639,14 @@ impl Encryptable for OutboundTransactionSql { impl OutboundTransaction { fn try_from(o: OutboundTransactionSql, cipher: &XChaCha20Poly1305) -> Result { let mut o = o.decrypt(cipher).map_err(TransactionStorageError::AeadError)?; - let outbound_tx = Self { tx_id: (o.tx_id as u64).into(), destination_address: TariAddress::from_bytes(&o.destination_address) .map_err(TransactionKeyError::Destination)?, amount: MicroMinotari::from(o.amount as u64), fee: MicroMinotari::from(o.fee as u64), - sender_protocol: serde_json::from_str(&o.sender_protocol.clone())?, + sender_protocol: bincode::deserialize(&o.sender_protocol) + .map_err(|e| TransactionStorageError::BincodeDeserialize(e.to_string()))?, status: TransactionStatus::Pending, message: o.message, timestamp: o.timestamp, @@ -1691,7 +1668,7 @@ impl OutboundTransaction { pub struct UpdateOutboundTransactionSql { cancelled: Option, direct_send_success: Option, - sender_protocol: Option, + sender_protocol: Option>, send_count: Option, last_send_timestamp: Option>, } diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 4326665cac..d922c4d92d 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -660,6 +660,182 @@ async fn manage_single_transaction() { assert_eq!(bob_oms.get_balance().await.unwrap().pending_incoming_balance, value); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn large_interactive_transaction() { + let network = Network::LocalNet; + let consensus_manager = ConsensusManager::builder(network).build().unwrap(); + let factories = CryptoFactories::default(); + + // Alice's parameters + let alice_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + // Bob's parameters + let bob_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + let base_node_identity = Arc::new(NodeIdentity::random( + &mut OsRng, + get_next_memory_address(), + PeerFeatures::COMMUNICATION_NODE, + )); + + log::info!( + "large_interactive_transaction: Alice: '{}', Bob: '{}', Base: '{}'", + alice_node_identity.node_id().short_str(), + bob_node_identity.node_id().short_str(), + base_node_identity.node_id().short_str() + ); + let temp_dir = tempdir().unwrap(); + let database_path = temp_dir.path().to_str().unwrap().to_string(); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + + // Alice sets up her Transaction Service + let shutdown = Shutdown::new(); + let (mut alice_ts, mut alice_oms, _alice_comms, _alice_connectivity, alice_key_manager_handle) = + setup_transaction_service( + alice_node_identity.clone(), + vec![], + consensus_manager.clone(), + factories.clone(), + alice_connection, + database_path.clone(), + Duration::from_secs(0), + shutdown.to_signal(), + ) + .await; + let mut alice_event_stream = alice_ts.get_event_stream(); + + sleep(Duration::from_secs(2)).await; + + // Bob sets up his Transaction Service + let (mut bob_ts, mut bob_oms, bob_comms, _bob_connectivity, _bob_key_manager_handle) = setup_transaction_service( + bob_node_identity.clone(), + vec![alice_node_identity.clone()], + consensus_manager, + factories.clone(), + bob_connection, + database_path, + Duration::from_secs(0), + shutdown.to_signal(), + ) + .await; + let mut bob_event_stream = bob_ts.get_event_stream(); + + // Verify that Alice and Bob are connected + let _peer_connection = bob_comms + .connectivity() + .dial_peer(alice_node_identity.node_id().clone()) + .await + .unwrap(); + + // Alice prepares her large transaction + let outputs_count = 1250u64; + let output_value = MicroMinotari(20000); + for _ in 0..outputs_count { + let uo = make_input( + &mut OsRng, + output_value, + &OutputFeatures::default(), + &alice_key_manager_handle, + ) + .await; + alice_oms.add_output(uo, None).await.unwrap(); + } + let transaction_value = output_value * (outputs_count - 1); + let bob_address = TariAddress::new(bob_node_identity.public_key().clone(), network); + + let message = "TAKE MAH MONEYS!".to_string(); + alice_ts + .send_transaction( + bob_address, + transaction_value, + UtxoSelectionCriteria::default(), + OutputFeatures::default(), + MicroMinotari::from(1), + message, + ) + .await + .expect("Alice sending large tx"); + + // Monitor Alice's and Bob's event streams for the transaction send results + let delay = sleep(Duration::from_secs(90)); + tokio::pin!(delay); + let mut bob_finalized = false; + let mut alice_finalized = false; + let mut tx_id = TxId::from(0u64); + loop { + tokio::select! { + event = alice_event_stream.recv() => { + // println!("alice: {:?}", event.as_ref().unwrap()); + match &*event.clone().unwrap() { + TransactionEvent::TransactionSendResult(id, _) => { + // We want to ensure that we can get the pending outbound transaction from the database, + // and excercise the sender_protocol + let pending_outbound = alice_ts.get_pending_outbound_transactions().await.unwrap(); + pending_outbound.get(id).unwrap().sender_protocol.get_amount_to_recipient().unwrap(); + assert_eq!( + pending_outbound.get(id).unwrap().sender_protocol.get_amount_to_recipient().unwrap(), + transaction_value + ); + }, + TransactionEvent::ReceivedTransactionReply(_) => { + alice_finalized = true; + if alice_finalized && bob_finalized { + break; + } + }, + _ => (), + } + }, + event = bob_event_stream.recv() => { + // println!("bob: {:?}", event.as_ref().unwrap()); + match &*event.clone().unwrap() { + TransactionEvent::ReceivedTransaction(id) => { + // We want to ensure that we can get the pending inbound transaction from the database, + // and excercise the receiver_protocol + let pending_inbound = bob_ts.get_pending_inbound_transactions().await.unwrap(); + assert!(pending_inbound.get(id).unwrap().receiver_protocol.get_signed_data().is_ok()); + assert_eq!(pending_inbound.get(id).unwrap().amount, transaction_value); + }, + TransactionEvent::ReceivedFinalizedTransaction(id) => { + tx_id = *id; + bob_finalized = true; + if alice_finalized && bob_finalized { + break; + } + }, + _ => (), + } + }, + () = &mut delay => { + break; + }, + } + } + assert!(bob_finalized && alice_finalized); + + let bob_completed_tx = bob_ts + .get_completed_transaction(tx_id) + .await + .expect("Could not find tx"); + assert_eq!( + bob_completed_tx.transaction.body.inputs().len(), + usize::try_from(outputs_count).unwrap() + ); + assert_eq!( + bob_oms.get_balance().await.unwrap().pending_incoming_balance, + transaction_value + ); +} + #[tokio::test] async fn single_transaction_to_self() { let network = Network::LocalNet;