Skip to content

Commit

Permalink
feat: wallet connectivity service
Browse files Browse the repository at this point in the history
Adds a service responsible for wallet connectivity. This service
is responsible for and abstracts any complexity in the management of
the base node connections and RPC session management.

This PR makes use of this service in the base node montoring service but
does not "plumb" the WalletConenctivityService into the protocols. This
is left as a TODO, but we should expect this to remove many lines of
code and greaty simplify these protocols by removing the budren of
connection management in the various wallet components.
  • Loading branch information
sdbondi committed Aug 4, 2021
1 parent 2cfdf7a commit 1930a16
Show file tree
Hide file tree
Showing 28 changed files with 1,149 additions and 298 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::{components::Component, state::AppState};
use tari_wallet::base_node_service::service::OnlineState;
use tari_wallet::connectivity_service::OnlineStatus;
use tui::{
backend::Backend,
layout::Rect,
Expand All @@ -45,17 +45,17 @@ impl<B: Backend> Component<B> for BaseNode {
let base_node_state = app_state.get_base_node_state();

let chain_info = match base_node_state.online {
OnlineState::Connecting => Spans::from(vec![
OnlineStatus::Connecting => Spans::from(vec![
Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)),
Span::raw(" "),
Span::styled("Connecting...", Style::default().fg(Color::Reset)),
]),
OnlineState::Offline => Spans::from(vec![
OnlineStatus::Offline => Spans::from(vec![
Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)),
Span::raw(" "),
Span::styled("Offline", Style::default().fg(Color::Red)),
]),
OnlineState::Online => {
OnlineStatus::Online => {
if let Some(metadata) = base_node_state.clone().chain_metadata {
let tip = metadata.height_of_longest_chain();

Expand Down
91 changes: 38 additions & 53 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ use tari_wallet::{
util::emoji::EmojiId,
WalletSqlite,
};
use tokio::sync::{watch, RwLock};
use tokio::{
sync::{watch, RwLock},
task,
};

const LOG_TARGET: &str = "wallet::console_wallet::app_state";

Expand Down Expand Up @@ -649,15 +652,7 @@ impl AppStateInner {
)
.await?;

if let Err(e) = self
.wallet
.transaction_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}
self.validate_outputs().await;
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
self.data.base_node_selected = peer.clone();
Expand Down Expand Up @@ -685,15 +680,7 @@ impl AppStateInner {
)
.await?;

if let Err(e) = self
.wallet
.transaction_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}
self.validate_outputs().await;
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
self.data.base_node_selected = peer.clone();
Expand Down Expand Up @@ -735,15 +722,7 @@ impl AppStateInner {
)
.await?;

if let Err(e) = self
.wallet
.transaction_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}
self.validate_outputs().await;
self.spawn_transaction_revalidation_task();

self.data.base_node_peer_custom = None;
self.data.base_node_selected = previous;
Expand All @@ -762,33 +741,39 @@ impl AppStateInner {
Ok(())
}

pub async fn validate_outputs(&mut self) {
if let Err(e) = self
.wallet
.output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating UTXOs: {}", e);
}
pub fn spawn_transaction_revalidation_task(&mut self) {
let mut txn_service = self.wallet.transaction_service.clone();
let mut output_manager_service = self.wallet.output_manager_service.clone();

if let Err(e) = self
.wallet
.output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating STXOs: {}", e);
}
task::spawn(async move {
if let Err(e) = txn_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}

if let Err(e) = self
.wallet
.output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating Invalid TXOs: {}", e);
}
if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating UTXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating STXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating Invalid TXOs: {}", e);
}
});
}
}

Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/base_node_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ const LOG_TARGET: &str = "wallet::base_node_service::config";
#[derive(Clone, Debug)]
pub struct BaseNodeServiceConfig {
pub base_node_monitor_refresh_interval: Duration,
pub base_node_rpc_pool_size: usize,
pub request_max_age: Duration,
}

impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(5),
base_node_rpc_pool_size: 10,
request_max_age: Duration::from_secs(60),
}
}
Expand All @@ -51,6 +53,7 @@ impl BaseNodeServiceConfig {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(refresh_interval),
request_max_age: Duration::from_secs(request_max_age),
..Default::default()
}
}
}
4 changes: 3 additions & 1 deletion base_layer/wallet/src/base_node_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::error::WalletStorageError;
use crate::{connectivity_service::WalletConnectivityError, error::WalletStorageError};
use tari_comms::{connectivity::ConnectivityError, protocol::rpc::RpcError};
use tari_comms_dht::outbound::DhtOutboundError;
use tari_service_framework::reply_channel::TransportChannelError;
Expand All @@ -46,4 +46,6 @@ pub enum BaseNodeServiceError {
InvalidBaseNodeResponse(String),
#[error("Wallet storage error: `{0}`")]
WalletStorageError(#[from] WalletStorageError),
#[error("Wallet connectivity error: `{0}`")]
WalletConnectivityError(#[from] WalletConnectivityError),
}
6 changes: 2 additions & 4 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@

use super::{error::BaseNodeServiceError, service::BaseNodeState};
use futures::{stream::Fuse, StreamExt};
use std::sync::Arc;
use tari_comms::peer_manager::Peer;

use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;
Expand Down
17 changes: 10 additions & 7 deletions base_layer/wallet/src/base_node_service/mock_base_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::base_node_service::{
error::BaseNodeServiceError,
handle::{BaseNodeServiceRequest, BaseNodeServiceResponse},
service::{BaseNodeState, OnlineState},
use crate::{
base_node_service::{
error::BaseNodeServiceError,
handle::{BaseNodeServiceRequest, BaseNodeServiceResponse},
service::BaseNodeState,
},
connectivity_service::OnlineStatus,
};
use futures::StreamExt;
use tari_common_types::chain_metadata::ChainMetadata;
Expand Down Expand Up @@ -81,9 +84,9 @@ impl MockBaseNodeService {
let (chain_metadata, is_synced, online) = match height {
Some(height) => {
let metadata = ChainMetadata::new(height, Vec::new(), 0, 0, 0);
(Some(metadata), Some(true), OnlineState::Online)
(Some(metadata), Some(true), OnlineStatus::Online)
},
None => (None, None, OnlineState::Offline),
None => (None, None, OnlineStatus::Offline),
};
self.state = BaseNodeState {
chain_metadata,
Expand All @@ -102,7 +105,7 @@ impl MockBaseNodeService {
is_synced: Some(true),
updated: None,
latency: None,
online: OnlineState::Online,
online: OnlineStatus::Online,
base_node_peer: None,
}
}
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/base_node_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ mod monitor;

use crate::{
base_node_service::{config::BaseNodeServiceConfig, handle::BaseNodeServiceHandle, service::BaseNodeService},
connectivity_service::WalletConnectivityHandle,
storage::database::{WalletBackend, WalletDatabase},
};
use log::*;
use tari_comms::connectivity::ConnectivityRequester;
use tari_service_framework::{
async_trait,
reply_channel,
Expand Down Expand Up @@ -80,12 +80,12 @@ where T: WalletBackend + 'static
let db = self.db.clone();

context.spawn_when_ready(move |handles| async move {
let connectivity_manager = handles.expect_handle::<ConnectivityRequester>();
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();

let service = BaseNodeService::new(
config,
request_stream,
connectivity_manager,
wallet_connectivity,
event_publisher,
handles.get_shutdown_signal(),
db,
Expand Down
Loading

0 comments on commit 1930a16

Please sign in to comment.