diff --git a/applications/minotari_console_wallet/src/automation/commands.rs b/applications/minotari_console_wallet/src/automation/commands.rs index e6d7344f5d..f64e13062d 100644 --- a/applications/minotari_console_wallet/src/automation/commands.rs +++ b/applications/minotari_console_wallet/src/automation/commands.rs @@ -741,6 +741,14 @@ pub async fn command_runner( println!("Command Runner"); println!("=============="); + let (_current_index, mut peer_list) = + if let Some((index, list)) = wallet.wallet_connectivity.get_base_node_peer_manager_state() { + (index, list) + } else { + (0, vec![]) + }; + let mut unban_per_manager_peers = false; + #[allow(clippy::enum_glob_use)] for (idx, parsed) in commands.into_iter().enumerate() { println!("\n{}. {:?}\n", idx + 1, parsed); @@ -868,12 +876,14 @@ pub async fn command_runner( } let mut recipient_info = Vec::new(); + let mut error = false; for item in args_recipient_info { if args.verify_unspent_outputs && !args.use_pre_mine_input_file { let embedded_outputs = match get_embedded_pre_mine_outputs(item.output_indexes.clone(), None) { Ok(outputs) => outputs, Err(e) => { eprintln!("\nError: {}\n", e); + error = true; break; }, }; @@ -890,6 +900,7 @@ pub async fn command_runner( "\nError: Outputs with output_hashes '{:?}' has already been spent!\n", missing.iter().map(|v| v.to_hex()).collect::>(), ); + error = true; break; } } @@ -901,6 +912,9 @@ pub async fn command_runner( }); } } + if error { + break; + } let (session_id, out_dir) = match create_pre_mine_output_dir(None) { Ok(values) => values, @@ -1032,9 +1046,17 @@ pub async fn command_runner( }, }; + println!(); let mut outputs_for_leader = Vec::with_capacity(args_recipient_info.len()); let mut outputs_for_self = Vec::with_capacity(args_recipient_info.len()); - for recipient_info in &args_recipient_info { + let mut error = false; + for (i, recipient_info) in args_recipient_info.iter().enumerate() { + println!( + " Start processing {} of {} recipients, current wallet {}", + i + 1, + args_recipient_info.len(), + recipient_info.recipient_address + ); let embedded_outputs = match get_embedded_pre_mine_outputs( recipient_info.output_indexes.clone(), pre_mine_from_file.clone(), @@ -1042,6 +1064,7 @@ pub async fn command_runner( Ok(outputs) => outputs, Err(e) => { eprintln!("\nError: {}\n", e); + error = true; break; }, }; @@ -1050,7 +1073,9 @@ pub async fn command_runner( .map(|v| v.commitment.clone()) .collect::>(); - for (output_index, commitment) in recipient_info.output_indexes.iter().zip(commitments.iter()) { + for (j, (output_index, commitment)) in + recipient_info.output_indexes.iter().zip(commitments.iter()).enumerate() + { let script_nonce_key = key_manager_service.get_random_key().await?; let sender_offset_key = key_manager_service.get_random_key().await?; let sender_offset_nonce = key_manager_service.get_random_key().await?; @@ -1079,6 +1104,7 @@ pub async fn command_runner( "\nError: Could not retrieve script key for output {}: {}\n", output_index, e ); + error = true; break; }, }; @@ -1105,8 +1131,19 @@ pub async fn command_runner( sender_offset_nonce_key_id: sender_offset_nonce.key_id, pre_mine_script_key_id, }); + println!( + " Processed {} of {} transactions", + j + 1, + recipient_info.output_indexes.len() + ); + } + if error { + break; } } + if error { + break; + } let out_dir = out_dir(&session_info.session_id)?; let out_file_leader = out_dir.join(get_file_name(SPEND_STEP_2_LEADER, Some(args.alias.clone()))); @@ -1142,6 +1179,9 @@ pub async fn command_runner( }, } + temp_ban_peers(&wallet, &mut peer_list).await; + unban_per_manager_peers = true; + // Read session info let session_info = read_verify_session_info::(&args.session_id)?; let session_info_indexed = session_info @@ -1159,6 +1199,7 @@ pub async fn command_runner( &session_info, )?); } + let mut error = false; for party in &party_info { let this_party_info = party .outputs_for_leader @@ -1179,9 +1220,13 @@ pub async fn command_runner( .map(|(index, address)| (*index, address.to_hex().clone())) .collect::>(), ); + error = true; break; } } + if error { + break; + } // Flatten and transpose party_info to be indexed by output index let party_info_flattened = party_info @@ -1216,7 +1261,8 @@ pub async fn command_runner( return Ok(()); }, }; - for indexed_info in party_info_per_index { + println!(); + for (i, indexed_info) in party_info_per_index.iter().enumerate() { #[allow(clippy::mutable_key_type)] let mut input_shares = HashMap::new(); let mut script_signature_public_nonces = Vec::with_capacity(indexed_info.len()); @@ -1231,6 +1277,7 @@ pub async fn command_runner( "\nError: Mismatched output indexes detected! (expected {}, got {})\n", current_index, item.output_index ); + error = true; break; } if current_recipient_address != item.recipient_address { @@ -1238,13 +1285,20 @@ pub async fn command_runner( "\nError: Mismatched recipient addresses detected! (expected {}, got {})\n", current_recipient_address, item.recipient_address ); + error = true; break; } - input_shares.insert(item.pre_mine_public_script_key, item.script_input_signature); - script_signature_public_nonces.push(item.public_script_nonce_key); - sender_offset_public_key_shares.push(item.public_sender_offset_key); - metadata_ephemeral_public_key_shares.push(item.public_sender_offset_nonce_key); - dh_shared_secret_shares.push(item.dh_shared_secret_public_key); + input_shares.insert( + item.pre_mine_public_script_key.clone(), + item.script_input_signature.clone(), + ); + script_signature_public_nonces.push(item.public_script_nonce_key.clone()); + sender_offset_public_key_shares.push(item.public_sender_offset_key.clone()); + metadata_ephemeral_public_key_shares.push(item.public_sender_offset_nonce_key.clone()); + dh_shared_secret_shares.push(item.dh_shared_secret_public_key.clone()); + } + if error { + break; } let original_maturity = pre_mine_items[current_index].original_maturity; @@ -1253,6 +1307,7 @@ pub async fn command_runner( Ok(outputs) => outputs[0].clone(), Err(e) => { eprintln!("\nError: {}\n", e); + error = true; break; }, }; @@ -1314,8 +1369,16 @@ pub async fn command_runner( tx_id, }); }, - Err(e) => eprintln!("\nError: Encumber aggregate transaction error! {}\n", e), + Err(e) => { + eprintln!("\nError: Encumber aggregate transaction error! {}\n", e); + error = true; + break; + }, } + println!(" Processed {} of {} transactions", i + 1, party_info_per_index.len()); + } + if error { + break; } let out_dir = out_dir(&args.session_id)?; @@ -1396,11 +1459,14 @@ pub async fn command_runner( }, }; + println!(); let mut outputs_for_leader = Vec::with_capacity(party_info_indexed.outputs_for_self.len()); - for (leader_info, party_info) in leader_info_indexed + let mut error = false; + for (i, (leader_info, party_info)) in leader_info_indexed .outputs_for_parties .iter() .zip(party_info_indexed.outputs_for_self.iter()) + .enumerate() { let embedded_output = match get_embedded_pre_mine_outputs( vec![party_info.output_index], @@ -1409,6 +1475,7 @@ pub async fn command_runner( Ok(outputs) => outputs[0].clone(), Err(e) => { eprintln!("\nError: {}\n", e); + error = true; break; }, }; @@ -1435,6 +1502,7 @@ pub async fn command_runner( Ok(signature) => signature, Err(e) => { eprintln!("\nError: Script signature SignMessage error! {}\n", e); + error = true; break; }, }; @@ -1446,6 +1514,7 @@ pub async fn command_runner( Ok(v) => v, Err(e) => { eprintln!("\nError: Could not create shared secret from canonical bytes! {}\n", e); + error = true; break; }, }; @@ -1459,6 +1528,7 @@ pub async fn command_runner( Ok((value, mask, id)) => (value, mask, id), Err(e) => { eprintln!("\nError: Could not decrypt data! {}\n", e); + error = true; break; }, }; @@ -1476,6 +1546,7 @@ pub async fn command_runner( Ok(_) => {}, Err(e) => { eprintln!("\nError: Could not verify mask! {}\n", e); + error = true; break; }, } @@ -1518,6 +1589,7 @@ pub async fn command_runner( Ok(signature) => signature, Err(e) => { eprintln!("\nError: Metadata signature SignMessage error! {}\n", e); + error = true; break; }, }; @@ -1529,6 +1601,7 @@ pub async fn command_runner( "\nError: Script and/or metadata signatures not created (index {})!\n", party_info.output_index ); + error = true; break; } @@ -1538,6 +1611,15 @@ pub async fn command_runner( metadata_signature, script_offset, }); + + println!( + " Processed {} of {} transactions", + i + 1, + leader_info_indexed.outputs_for_parties.len() + ); + } + if error { + break; } let out_dir = out_dir(&args.session_id)?; @@ -1568,6 +1650,9 @@ pub async fn command_runner( }, } + temp_ban_peers(&wallet, &mut peer_list).await; + unban_per_manager_peers = true; + // Read session info let session_info = read_verify_session_info::(&args.session_id)?; @@ -1605,6 +1690,7 @@ pub async fn command_runner( ); break; } + let mut error = false; for party in &party_info { let party_info_indexes = party .outputs_for_leader @@ -1616,9 +1702,13 @@ pub async fn command_runner( "\nError: Mismatched output indexes from '{}' detected! session {:?} vs. party {:?}\n", party.alias, session_info_indexes, party_info_indexes ); + error = true; break; } } + if error { + break; + } // Flatten and transpose party_info to be indexed by output index let party_info_flattened = party_info @@ -1640,7 +1730,10 @@ pub async fn command_runner( let mut outputs = Vec::new(); let mut kernels = Vec::new(); let mut kernel_offset = PrivateKey::default(); - for (indexed_info, leader_self) in party_info_per_index.iter().zip(leader_info.outputs_for_self.iter()) + for (i, (indexed_info, leader_self)) in party_info_per_index + .iter() + .zip(leader_info.outputs_for_self.iter()) + .enumerate() { let mut metadata_signatures = Vec::with_capacity(party_info_per_index.len()); let mut script_signatures = Vec::with_capacity(party_info_per_index.len()); @@ -1664,10 +1757,12 @@ pub async fn command_runner( "\nError: Error completing transaction '{}'! ({})\n", leader_self.tx_id, e ); + error = true; break; } // Collect all inputs, outputs and kernels that should go into the genesis block + println!(); if session_info.use_pre_mine_input_file { match transaction_service.get_any_transaction(leader_self.tx_id).await { Ok(Some(WalletTransaction::Completed(tx))) => { @@ -1679,11 +1774,13 @@ pub async fn command_runner( "\nError: Transaction {} fee ({}) for does not equal zero!\n", tx.tx_id, fee ); + error = true; break; } }, Err(e) => { eprintln!("\nError: Transaction {}! ({})\n", tx.tx_id, e); + error = true; break; }, } @@ -1699,10 +1796,14 @@ pub async fn command_runner( Ok(commitment) => utxo_sum = &utxo_sum - commitment, Err(e) => { eprintln!("\nError: Input commitment ({})!\n", e); + error = true; break; }, } } + if error { + break; + } let mut kernel_sum = Commitment::default(); for kernel in tx.transaction.body.kernels() { kernels.push(kernel.clone()); @@ -1722,6 +1823,8 @@ pub async fn command_runner( utxo_sum.to_hex(), (&kernel_sum + &offset).to_hex() ); + error = true; + break; } }, Ok(_) => { @@ -1737,6 +1840,11 @@ pub async fn command_runner( }, } } + + println!(" Processed {} of {}", i + 1, party_info_per_index.len()); + } + if error { + break; } let file_name = get_pre_mine_addition_file_name(); @@ -2522,6 +2630,9 @@ pub async fn command_runner( }, } } + if unban_per_manager_peers { + lift_temp_ban_peers(&wallet, &mut peer_list).await; + } // listen to event stream if tx_ids.is_empty() { @@ -2561,6 +2672,44 @@ pub async fn command_runner( Ok(()) } +async fn temp_ban_peers(wallet: &WalletSqlite, peer_list: &mut Vec) { + for peer in peer_list { + let _unused = wallet + .comms + .connectivity() + .remove_peer_from_allow_list(peer.node_id.clone()) + .await; + let _unused = wallet + .comms + .connectivity() + .ban_peer_until( + peer.node_id.clone(), + Duration::from_secs(24 * 60 * 60), + "Busy with pre-mine spend".to_string(), + ) + .await; + } +} + +async fn lift_temp_ban_peers(wallet: &WalletSqlite, peer_list: &mut Vec) { + for peer in peer_list { + let _unused = wallet + .comms + .connectivity() + .ban_peer_until( + peer.node_id.clone(), + Duration::from_millis(1), + "Busy with pre-mine spend".to_string(), + ) + .await; + let _unused = wallet + .comms + .connectivity() + .add_peer_to_allow_list(peer.node_id.clone()) + .await; + } +} + fn read_genesis_file_outputs( use_pre_mine_input_file: bool, pre_mine_file_path: Option, diff --git a/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs index 2a125f25a3..7bb83c7dc9 100644 --- a/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs +++ b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs @@ -65,6 +65,11 @@ impl BaseNodePeerManager { self.current_peer_index = (self.current_peer_index + 1) % self.peer_list.len(); self.peer_list[self.current_peer_index].clone() } + + /// Get the base node peer manager state + pub fn get_state(&self) -> (usize, Vec) { + (self.current_peer_index, self.peer_list.clone()) + } } impl Display for BaseNodePeerManager { diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index af9bb72fc4..876ad089d3 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -62,19 +62,23 @@ impl WalletConnectivityHandle { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityHandle { - fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) { + fn set_base_node(&mut self, base_node_peer_manager: BaseNodePeerManager) { if let Some(selected_peer) = self.base_node_watch.borrow().as_ref() { - if selected_peer.get_current_peer().public_key == base_node_peer.get_current_peer().public_key { + if selected_peer.get_current_peer().public_key == base_node_peer_manager.get_current_peer().public_key { return; } } - self.base_node_watch.send(Some(base_node_peer)); + self.base_node_watch.send(Some(base_node_peer_manager)); } fn get_current_base_node_watcher(&self) -> watch::Receiver> { self.base_node_watch.get_receiver() } + fn get_base_node_peer_manager_state(&self) -> Option<(usize, Vec)> { + self.base_node_watch.borrow().as_ref().map(|p| p.get_state().clone()) + } + /// Obtain a BaseNodeWalletRpcClient. /// /// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base diff --git a/base_layer/wallet/src/connectivity_service/interface.rs b/base_layer/wallet/src/connectivity_service/interface.rs index e0a3639b2f..e974df59eb 100644 --- a/base_layer/wallet/src/connectivity_service/interface.rs +++ b/base_layer/wallet/src/connectivity_service/interface.rs @@ -75,4 +75,6 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { fn get_current_base_node_peer_node_id(&self) -> Option; fn is_base_node_set(&self) -> bool; + + fn get_base_node_peer_manager_state(&self) -> Option<(usize, Vec)>; } diff --git a/base_layer/wallet/src/connectivity_service/mock.rs b/base_layer/wallet/src/connectivity_service/mock.rs index f61625ec32..e34f4eca47 100644 --- a/base_layer/wallet/src/connectivity_service/mock.rs +++ b/base_layer/wallet/src/connectivity_service/mock.rs @@ -90,6 +90,10 @@ impl WalletConnectivityInterface for WalletConnectivityMock { self.base_node_watch.get_receiver() } + fn get_base_node_peer_manager_state(&self) -> Option<(usize, Vec)> { + self.base_node_watch.borrow().as_ref().map(|p| p.get_state().clone()) + } + async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option> { let mut receiver = self.base_node_wallet_rpc_client.get_receiver(); if let Some(client) = receiver.borrow().as_ref() {