Skip to content

Commit

Permalink
feat!: add support for specifying custom messages for scanned outputs…
Browse files Browse the repository at this point in the history
… in libwallet (#3871)

Description
---
This PR adds the ability for clients to specify custom text messages to be added to outputs that are recovered or one-sided payments that are scanned from the blockchain.
These messages were hardcoded before by clients should be able to specify their own so that localized messages can be provided in the future. The PR adds the ability to specify these messages when constucting the Utxo Scanner Service and also provides the interface to update these messages while the wallet is running.

In the FFI interface the `wallet_start_recovery` method was updated to accept a custom Recovery Output message. For the One-Sided payment custom message the `wallet_set_one_sided_payment_message` method was added to allow the client to set the One sided payment message after the Wallet has been created.

How Has This Been Tested?
---
While working on this PR it was noted that the testing harness for the Utxo Scanner Service needed some updating and needed tests added that tested the the One-sided payment scanning. While writing these tests a bug was found in the Utxo Scanning logic that meant the tip block was only scanned every second block that was received. This bug was fixed
  • Loading branch information
philipr-za authored Feb 28, 2022
1 parent 8902f97 commit 0d7f8fc
Show file tree
Hide file tree
Showing 14 changed files with 870 additions and 352 deletions.
2 changes: 1 addition & 1 deletion base_layer/wallet/src/connectivity_service/initializer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021, The Tari Project
// Copyright 2022, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::{

/// API Request enum
#[allow(clippy::large_enum_variant)]
#[derive(Clone)]
pub enum TransactionServiceRequest {
GetPendingInboundTransactions,
GetPendingOutboundTransactions,
Expand Down
34 changes: 31 additions & 3 deletions base_layer/wallet/src/utxo_scanner_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use std::time::Duration;

use tari_comms::peer_manager::NodeId;
use tari_core::transactions::tari_amount::MicroTari;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, watch};

use crate::util::watch::Watch;

#[derive(Debug, Clone)]
pub enum UtxoScannerEvent {
Expand Down Expand Up @@ -60,14 +62,40 @@ pub enum UtxoScannerEvent {
#[derive(Clone)]
pub struct UtxoScannerHandle {
event_sender: broadcast::Sender<UtxoScannerEvent>,
one_sided_message_watch: Watch<String>,
recovery_message_watch: Watch<String>,
}

impl UtxoScannerHandle {
pub fn new(event_sender: broadcast::Sender<UtxoScannerEvent>) -> Self {
UtxoScannerHandle { event_sender }
pub fn new(
event_sender: broadcast::Sender<UtxoScannerEvent>,
one_sided_message_watch: Watch<String>,
recovery_message_watch: Watch<String>,
) -> Self {
UtxoScannerHandle {
event_sender,
one_sided_message_watch,
recovery_message_watch,
}
}

pub fn get_event_receiver(&mut self) -> broadcast::Receiver<UtxoScannerEvent> {
self.event_sender.subscribe()
}

pub fn set_one_sided_payment_message(&mut self, note: String) {
self.one_sided_message_watch.send(note);
}

pub fn set_recovery_message(&mut self, note: String) {
self.recovery_message_watch.send(note);
}

pub(crate) fn get_one_sided_payment_message_watcher(&self) -> watch::Receiver<String> {
self.one_sided_message_watch.get_receiver()
}

pub(crate) fn get_recovery_message_watcher(&self) -> watch::Receiver<String> {
self.recovery_message_watch.get_receiver()
}
}
126 changes: 126 additions & 0 deletions base_layer/wallet/src/utxo_scanner_service/initializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::Arc;

use futures::future;
use log::*;
use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity};
use tari_core::transactions::CryptoFactories;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;

use crate::{
base_node_service::handle::BaseNodeServiceHandle,
connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface},
output_manager_service::handle::OutputManagerHandle,
storage::database::{WalletBackend, WalletDatabase},
transaction_service::handle::TransactionServiceHandle,
util::watch::Watch,
utxo_scanner_service::{
handle::UtxoScannerHandle,
service::UtxoScannerService,
uxto_scanner_service_builder::UtxoScannerMode,
},
};

const LOG_TARGET: &str = "wallet::utxo_scanner_service::initializer";

pub struct UtxoScannerServiceInitializer<T> {
backend: Option<WalletDatabase<T>>,
factories: CryptoFactories,
node_identity: Arc<NodeIdentity>,
}

impl<T> UtxoScannerServiceInitializer<T>
where T: WalletBackend + 'static
{
pub fn new(backend: WalletDatabase<T>, factories: CryptoFactories, node_identity: Arc<NodeIdentity>) -> Self {
Self {
backend: Some(backend),
factories,
node_identity,
}
}
}

#[async_trait]
impl<T> ServiceInitializer for UtxoScannerServiceInitializer<T>
where T: WalletBackend + 'static
{
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
trace!(target: LOG_TARGET, "Utxo scanner initialization");

let (event_sender, _) = broadcast::channel(200);

let recovery_message_watch = Watch::new("Output found on blockchain during Wallet Recovery".to_string());
let one_sided_message_watch = Watch::new("Detected one-sided payment on blockchain".to_string());

let recovery_message_watch_receiver = recovery_message_watch.get_receiver();
let one_sided_message_watch_receiver = one_sided_message_watch.get_receiver();

// Register handle before waiting for handles to be ready
let utxo_scanner_handle =
UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch);
context.register_handle(utxo_scanner_handle);

let backend = self
.backend
.take()
.expect("Cannot start Utxo scanner service without setting a storage backend");
let factories = self.factories.clone();
let node_identity = self.node_identity.clone();

context.spawn_when_ready(move |handles| async move {
let transaction_service = handles.expect_handle::<TransactionServiceHandle>();
let output_manager_service = handles.expect_handle::<OutputManagerHandle>();
let comms_connectivity = handles.expect_handle::<ConnectivityRequester>();
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();

let scanning_service = UtxoScannerService::<T>::builder()
.with_peers(vec![])
.with_retry_limit(2)
.with_mode(UtxoScannerMode::Scanning)
.build_with_resources(
backend,
comms_connectivity,
wallet_connectivity.get_current_base_node_watcher(),
output_manager_service,
transaction_service,
node_identity,
factories,
handles.get_shutdown_signal(),
event_sender,
base_node_service_handle,
one_sided_message_watch_receiver,
recovery_message_watch_receiver,
)
.run();

futures::pin_mut!(scanning_service);
future::select(scanning_service, handles.get_shutdown_signal()).await;
info!(target: LOG_TARGET, "Utxo scanner service shutdown");
});
Ok(())
}
}
126 changes: 31 additions & 95 deletions base_layer/wallet/src/utxo_scanner_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,102 +1,38 @@
use std::sync::Arc;

use futures::future;
use log::*;
use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity};
use tari_core::transactions::CryptoFactories;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;

use crate::{
base_node_service::handle::BaseNodeServiceHandle,
connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface},
output_manager_service::handle::OutputManagerHandle,
storage::database::{WalletBackend, WalletDatabase},
transaction_service::handle::TransactionServiceHandle,
utxo_scanner_service::{
handle::UtxoScannerHandle,
service::UtxoScannerService,
uxto_scanner_service_builder::UtxoScannerMode,
},
};
// Copyright 2022, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod error;
pub mod handle;
pub mod initializer;
pub mod service;
mod utxo_scanner_task;
pub mod uxto_scanner_service_builder;

pub use utxo_scanner_task::RECOVERY_KEY;

const LOG_TARGET: &str = "wallet::utxo_scanner_service::initializer";

pub struct UtxoScannerServiceInitializer<T> {
backend: Option<WalletDatabase<T>>,
factories: CryptoFactories,
node_identity: Arc<NodeIdentity>,
}

impl<T> UtxoScannerServiceInitializer<T>
where T: WalletBackend + 'static
{
pub fn new(backend: WalletDatabase<T>, factories: CryptoFactories, node_identity: Arc<NodeIdentity>) -> Self {
Self {
backend: Some(backend),
factories,
node_identity,
}
}
}

#[async_trait]
impl<T> ServiceInitializer for UtxoScannerServiceInitializer<T>
where T: WalletBackend + 'static
{
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
trace!(target: LOG_TARGET, "Utxo scanner initialization");

let (event_sender, _) = broadcast::channel(200);

// Register handle before waiting for handles to be ready
let utxo_scanner_handle = UtxoScannerHandle::new(event_sender.clone());
context.register_handle(utxo_scanner_handle);

let backend = self
.backend
.take()
.expect("Cannot start Utxo scanner service without setting a storage backend");
let factories = self.factories.clone();
let node_identity = self.node_identity.clone();

context.spawn_when_ready(move |handles| async move {
let transaction_service = handles.expect_handle::<TransactionServiceHandle>();
let output_manager_service = handles.expect_handle::<OutputManagerHandle>();
let comms_connectivity = handles.expect_handle::<ConnectivityRequester>();
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();

let scanning_service = UtxoScannerService::<T>::builder()
.with_peers(vec![])
.with_retry_limit(2)
.with_mode(UtxoScannerMode::Scanning)
.build_with_resources(
backend,
comms_connectivity,
wallet_connectivity.get_current_base_node_watcher(),
output_manager_service,
transaction_service,
node_identity,
factories,
handles.get_shutdown_signal(),
event_sender,
base_node_service_handle,
)
.run();

futures::pin_mut!(scanning_service);
future::select(scanning_service, handles.get_shutdown_signal()).await;
info!(target: LOG_TARGET, "Utxo scanner service shutdown");
});
Ok(())
}
}
pub const RECOVERY_KEY: &str = "recovery_data";
14 changes: 14 additions & 0 deletions base_layer/wallet/src/utxo_scanner_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ where TBackend: WalletBackend + 'static
pub(crate) shutdown_signal: ShutdownSignal,
pub(crate) event_sender: broadcast::Sender<UtxoScannerEvent>,
pub(crate) base_node_service: BaseNodeServiceHandle,
one_sided_message_watch: watch::Receiver<String>,
recovery_message_watch: watch::Receiver<String>,
}

impl<TBackend> UtxoScannerService<TBackend>
Expand All @@ -78,6 +80,8 @@ where TBackend: WalletBackend + 'static
shutdown_signal: ShutdownSignal,
event_sender: broadcast::Sender<UtxoScannerEvent>,
base_node_service: BaseNodeServiceHandle,
one_sided_message_watch: watch::Receiver<String>,
recovery_message_watch: watch::Receiver<String>,
) -> Self {
Self {
resources,
Expand All @@ -87,6 +91,8 @@ where TBackend: WalletBackend + 'static
shutdown_signal,
event_sender,
base_node_service,
one_sided_message_watch,
recovery_message_watch,
}
}

Expand Down Expand Up @@ -171,6 +177,12 @@ where TBackend: WalletBackend + 'static
info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal");
return Ok(());
}
Ok(_) = self.one_sided_message_watch.changed() => {
self.resources.one_sided_payment_message = (*self.one_sided_message_watch.borrow()).clone();
},
Ok(_) = self.recovery_message_watch.changed() => {
self.resources.recovery_message = (*self.recovery_message_watch.borrow()).clone();
},
}
}
}
Expand All @@ -186,6 +198,8 @@ pub struct UtxoScannerResources<TBackend> {
pub transaction_service: TransactionServiceHandle,
pub node_identity: Arc<NodeIdentity>,
pub factories: CryptoFactories,
pub recovery_message: String,
pub one_sided_payment_message: String,
}

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 0d7f8fc

Please sign in to comment.