Skip to content

Commit

Permalink
fix: make tor startup async
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored and SWvheerden committed Jan 22, 2024
1 parent 7a54cf2 commit 1a7000b
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 72 deletions.
21 changes: 15 additions & 6 deletions applications/minotari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#![allow(dead_code, unused)]

use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use std::time::Instant;

use log::*;
use minotari_app_utilities::identity_management::setup_node_identity;
Expand Down Expand Up @@ -442,6 +443,8 @@ pub async fn init_wallet(
.map_err(|e| ExitError::new(ExitCode::WalletError, format!("Error consensus manager. {}", e)))?;
let factories = CryptoFactories::default();

let now = Instant::now();

let mut wallet = Wallet::start(
wallet_config,
config.peer_seeds.clone(),
Expand All @@ -463,12 +466,18 @@ pub async fn init_wallet(
WalletError::CommsInitializationError(cie) => cie.to_exit_error(),
e => ExitError::new(ExitCode::WalletError, format!("Error creating Wallet Container: {}", e)),
})?;
if let Some(hs) = wallet.comms.hidden_service() {
wallet
.db
.set_tor_identity(hs.tor_identity().clone())
.map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?;
}
// TODO: fix this
// if let Some(hs) = wallet.comms.hidden_service() {
// wallet
// .db
// .set_tor_identity(hs.tor_identity().clone())
// .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?;
// }

error!(
target: LOG_TARGET,
"Wallet started in {}ms", now.elapsed().as_millis()
);

if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.get_seed_words(&MnemonicLanguage::English)?.join(" ");
Expand Down
9 changes: 5 additions & 4 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ where B: BlockchainBackend + 'static
.map_err(|e| ExitError::new(ExitCode::IdentityError, e))?;
},
};
if let Some(hs) = comms.hidden_service() {
identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity())
.map_err(|e| ExitError::new(ExitCode::IdentityError, e))?;
}
todo!("Fix this");
// if let Some(hs) = comms.hidden_service() {
// identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity())
// .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?;
// }

handles.register(comms);

Expand Down
9 changes: 5 additions & 4 deletions base_layer/contacts/src/chat_client/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ pub async fn start(
trace!(target: LOG_TARGET, "save chat identity file");
},
};
if let Some(hs) = comms.hidden_service() {
identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?;
trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity());
}
todo!("Fix this");
// if let Some(hs) = comms.hidden_service() {
// identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?;
// trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity());
// }
handles.register(comms);

let comms = handles.expect_handle::<CommsNode>();
Expand Down
14 changes: 6 additions & 8 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use tari_storage::{
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tower::ServiceBuilder;
use tari_comms::transports::HiddenServiceTransport;

use crate::{
comms_connector::{InboundDomainConnector, PubsubDomainConnector},
Expand Down Expand Up @@ -251,20 +252,17 @@ pub async fn spawn_comms_using_transport(
let listener_address_override = tor_config.listener_address_override.clone();
let mut hidden_service_ctl = initialize_hidden_service(tor_config)?;
// Set the listener address to be the address (usually local) to which tor will forward all traffic
let transport = hidden_service_ctl.initialize_transport().await?;
let instant = Instant::now();
let transport = HiddenServiceTransport::new(hidden_service_ctl);
error!(target: LOG_TARGET, "TOR transport initialized in {:.0?}", instant.elapsed());


info!(
target: LOG_TARGET,
"Tor hidden service initialized. proxied_address = '{:?}', listener_override_address = {:?}",
hidden_service_ctl.proxied_address(),
listener_address_override,
);

comms
.with_listener_address(
listener_address_override.unwrap_or_else(|| multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]),
)
.with_hidden_service_controller(hidden_service_ctl)
// .with_hidden_service_controller(hidden_service_ctl)
.spawn_with_transport(transport)
.await?
},
Expand Down
28 changes: 14 additions & 14 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5472,11 +5472,11 @@ pub unsafe extern "C" fn wallet_create(
match w {
Ok(w) => {
// lets ensure the wallet tor_id is saved, this could have been changed during wallet startup
if let Some(hs) = w.comms.hidden_service() {
if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) {
warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e);
}
}
// if let Some(hs) = w.comms.hidden_service() {
// if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) {
// warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e);
// }
// }
let wallet_address = TariAddress::new(w.comms.node_identity().public_key().clone(), w.network.as_network());

// Start Callback Handler
Expand Down Expand Up @@ -5512,15 +5512,15 @@ pub unsafe extern "C" fn wallet_create(

runtime.spawn(callback_handler.start());

let mut ts = w.transaction_service.clone();
runtime.spawn(async move {
if let Err(e) = ts.restart_transaction_protocols().await {
warn!(
target: LOG_TARGET,
"Could not restart transaction negotiation protocols: {:?}", e
);
}
});
// let mut ts = w.transaction_service.clone();
// runtime.spawn(async move {
// if let Err(e) = ts.restart_transaction_protocols().await {
// warn!(
// target: LOG_TARGET,
// "Could not restart transaction negotiation protocols: {:?}", e
// );
// }
// });

let tari_wallet = TariWallet {
wallet: w,
Expand Down
69 changes: 33 additions & 36 deletions comms/core/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{iter, sync::Arc, time::Duration};
use std::time::Instant;

use log::*;
use multiaddr::{multiaddr, Protocol};
Expand Down Expand Up @@ -218,28 +219,30 @@ impl UnspawnedCommsNode {
node_identity.node_id()
);

let listening_info = connection_manager_requester.wait_until_listening().await?;

// let instant = Instant::now();
//
// let listening_info = connection_manager_requester.wait_until_listening().await?;
// error!(target: LOG_TARGET, "Waited for {} to connect", instant.elapsed().as_millis());
// Final setup of the hidden service.
let mut hidden_service = None;
if let Some(mut ctl) = hidden_service_ctl {
// Only set the address to the bind address it is set to TCP port 0
let mut proxied_addr = ctl.proxied_address();
if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) {
// Remove the TCP port 0 address and replace it with the actual listener port
if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() {
proxied_addr.pop();
proxied_addr.push(Protocol::Tcp(port));
ctl.set_proxied_addr(&proxied_addr);
}
}
let hs = ctl.create_hidden_service().await?;
let onion_addr = hs.get_onion_address();
if !node_identity.public_addresses().contains(&onion_addr) {
node_identity.add_public_address(onion_addr);
}
hidden_service = Some(hs);
}
// let mut hidden_service = None;
// if let Some(mut ctl) = hidden_service_ctl {
// // Only set the address to the bind address it is set to TCP port 0
// let mut proxied_addr = ctl.proxied_address();
// if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) {
// // Remove the TCP port 0 address and replace it with the actual listener port
// if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() {
// proxied_addr.pop();
// proxied_addr.push(Protocol::Tcp(port));
// ctl.set_proxied_addr(&proxied_addr);
// }
// }
// let hs = ctl.create_hidden_service().await?;
// let onion_addr = hs.get_onion_address();
// if !node_identity.public_addresses().contains(&onion_addr) {
// node_identity.add_public_address(onion_addr);
// }
// hidden_service = Some(hs);
// }
info!(
target: LOG_TARGET,
"Your node's public addresses are '{}'",
Expand All @@ -266,11 +269,10 @@ impl UnspawnedCommsNode {
shutdown_signal,
connection_manager_requester,
connectivity_requester,
listening_info,
node_identity,
peer_manager,
liveness_watch,
hidden_service,
// hidden_service,
complete_signals: ext_context.drain_complete_signals(),
})
}
Expand Down Expand Up @@ -313,11 +315,11 @@ pub struct CommsNode {
/// Shared PeerManager instance
peer_manager: Arc<PeerManager>,
/// The bind addresses of the listener(s)
listening_info: ListenerInfo,
// listening_info: ListenerInfo,
/// Current liveness status
liveness_watch: watch::Receiver<LivenessStatus>,
/// `Some` if the comms node is configured to run via a hidden service, otherwise `None`
hidden_service: Option<tor::HiddenService>,
//hidden_service: Option<tor::HiddenService>,
/// The 'reciprocal' shutdown signals for each comms service
complete_signals: Vec<ShutdownSignal>,
}
Expand Down Expand Up @@ -349,25 +351,20 @@ impl CommsNode {
}

/// Return the Ip/Tcp address that this node is listening on
pub fn listening_address(&self) -> &Multiaddr {
self.listening_info.bind_address()
}
// pub fn listening_address(&self) -> &Multiaddr {
// self.listening_info.bind_address()
// }

/// Return [ListenerInfo]
pub fn listening_info(&self) -> &ListenerInfo {
&self.listening_info
}
// pub fn listening_info(&self) -> &ListenerInfo {
// &self.listening_info
// }

/// Returns the current liveness status
pub fn liveness_status(&self) -> LivenessStatus {
*self.liveness_watch.borrow()
}

/// Return the Ip/Tcp address that this node is listening on
pub fn hidden_service(&self) -> Option<&tor::HiddenService> {
self.hidden_service.as_ref()
}

/// Return a handle that is used to call the connectivity service.
pub fn connectivity(&self) -> ConnectivityRequester {
self.connectivity_requester.clone()
Expand Down
3 changes: 3 additions & 0 deletions comms/core/src/tor/hidden_service/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ impl HiddenServiceController {
}

pub async fn initialize_transport(&mut self) -> Result<SocksTransport, HiddenServiceControllerError> {
dbg!("here3");
self.connect_and_auth().await?;
dbg!("here4");
let socks_addr = self.get_socks_address().await?;
Ok(SocksTransport::new(SocksConfig {
proxy_address: socks_addr,
Expand Down Expand Up @@ -235,6 +237,7 @@ impl HiddenServiceController {
}

fn client_mut(&mut self) -> Result<&mut TorControlPortClient, HiddenServiceControllerError> {
dbg!("here5");
self.client
.as_mut()
.filter(|c| c.is_connected())
Expand Down
100 changes: 100 additions & 0 deletions comms/core/src/transports/hidden_service_transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::io;
use std::sync::Arc;
use log::info;
use multiaddr::Multiaddr;
use tokio::sync::RwLock;
use crate::tor::HiddenServiceController;
use crate::transports::{SocksTransport, TcpTransport, Transport};
use crate::transports::tcp::TcpInbound;

const LOG_TARGET: &str = "comms::transports::hidden_service_transport";

#[derive(thiserror::Error, Debug)]
pub enum HiddenServiceTransportError {
#[error("Tor hidden service transport error: `{0}`")]
HiddenServiceControllerError(#[from] crate::tor::HiddenServiceControllerError),
#[error("Tor hidden service socks error: `{0}`")]
SocksTransportError(#[from] io::Error),

}

struct HiddenServiceTransportInner {
socks_transport: Option<SocksTransport>,
hidden_service_ctl: HiddenServiceController

}

#[derive(Clone)]
pub struct HiddenServiceTransport {
inner: Arc<RwLock<HiddenServiceTransportInner>>
}

impl HiddenServiceTransport {
pub fn new(hidden_service_ctl: HiddenServiceController) -> Self {
Self {
inner : Arc::new(RwLock::new(HiddenServiceTransportInner {
socks_transport: None,
hidden_service_ctl
}))
}
}

async fn ensure_initialized(&self) -> Result<(), io::Error> {
let inner = self.inner.read().await;
if inner.socks_transport.is_none() {
drop(inner);
let mut mut_inner = self.inner.write().await;
if mut_inner.socks_transport.is_none() {
let transport = mut_inner.hidden_service_ctl.initialize_transport().await.expect("TODO NEED TO MAP THESE ERRORS SOMEHOW");
mut_inner.socks_transport = Some(transport);
}
}
Ok(())
}
}
#[crate::async_trait]
impl Transport for HiddenServiceTransport {
type Output = <SocksTransport as Transport>::Output;
type Error = <SocksTransport as Transport>::Error;
type Listener = <SocksTransport as Transport>::Listener;

async fn listen(&self, addr: &Multiaddr) -> Result<(Self::Listener, Multiaddr), Self::Error> {
self.ensure_initialized().await?;
let inner = self.inner.read().await;

// info!(
// target: LOG_TARGET,
// "Tor hidden service initialized. proxied_address = '{:?}'",
// inner.proxied_address(),
// );
Ok(inner.socks_transport.as_ref().unwrap().listen(addr).await?)
}

async fn dial(&self, addr: &Multiaddr) -> Result<Self::Output, Self::Error> {
self.ensure_initialized().await?;
let inner = self.inner.read().await;
Ok(inner.socks_transport.as_ref().unwrap().dial(addr).await?)
}
}
3 changes: 3 additions & 0 deletions comms/core/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ mod tcp;
pub use tcp::TcpTransport;

mod tcp_with_tor;
mod hidden_service_transport;
pub use hidden_service_transport::HiddenServiceTransport;

pub use tcp_with_tor::TcpWithTorTransport;

/// Defines an abstraction for implementations that can dial and listen for connections over a provided address.
Expand Down

0 comments on commit 1a7000b

Please sign in to comment.