Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read asset definitions from base layer #3802

Merged
merged 3 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ service Wallet {
rpc GetCompletedTransactions (GetCompletedTransactionsRequest) returns (stream GetCompletedTransactionsResponse);
// Returns the balance
rpc GetBalance (GetBalanceRequest) returns (GetBalanceResponse);
// Returns unspent amounts
rpc GetUnspentAmounts (Empty) returns (GetUnspentAmountsResponse);
// Request the wallet perform a coinsplit
rpc CoinSplit (CoinSplitRequest) returns (CoinSplitResponse);
// Import Utxo to wallet
Expand Down Expand Up @@ -206,6 +208,10 @@ message GetBalanceResponse {
uint64 pending_outgoing_balance = 3;
}

message GetUnspentAmountsResponse {
repeated uint64 amount = 1;
}

message GetCoinbaseRequest {
uint64 reward = 1;
uint64 fee = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,19 @@ impl WalletClient {
debug!(target: LOG_TARGET, "result {:?}", result);
Ok(result.into_inner())
}

pub async fn get_unspent_amounts(
&mut self,
) -> Result<grpc::GetUnspentAmountsResponse, CollectiblesError> {
let inner = self.inner.as_mut().unwrap();
let request = grpc::Empty {};
let result = inner.get_unspent_amounts(request).await.map_err(|source| {
CollectiblesError::ClientRequestError {
request: "get_unspent_amounts".to_string(),
source,
}
})?;
debug!(target: LOG_TARGET, "result {:?}", result);
Ok(result.into_inner())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ pub(crate) async fn asset_wallets_get_balance(
Ok(total)
}

#[tauri::command]
pub(crate) async fn asset_wallets_get_unspent_amounts(
state: tauri::State<'_, ConcurrentAppState>,
) -> Result<Vec<u64>, Status> {
let mut client = state.create_wallet_client().await;
client.connect().await?;
let result = client.get_unspent_amounts().await?;
Ok(result.amount)
}

#[tauri::command]
pub(crate) async fn asset_wallets_list(
state: tauri::State<'_, ConcurrentAppState>,
Expand Down
1 change: 1 addition & 0 deletions applications/tari_collectibles/src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn main() -> Result<(), Box<dyn Error>> {
commands::asset_wallets::asset_wallets_create,
commands::asset_wallets::asset_wallets_list,
commands::asset_wallets::asset_wallets_get_balance,
commands::asset_wallets::asset_wallets_get_unspent_amounts,
commands::asset_wallets::asset_wallets_get_latest_address,
commands::asset_wallets::asset_wallets_create_address,
commands::asset_wallets::asset_wallets_send_to,
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_collectibles/web-app/src/Create.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ class Create extends React.Component {
templateIds.push(721);
}

let outputs = await binding.command_asset_wallets_get_unspent_amounts();

if (outputs.length <= 1) {
throw { message: "You need at least two unspent outputs" };
}
let publicKey = await binding.command_assets_create(
name,
description,
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_collectibles/web-app/src/binding.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ async function command_asset_wallets_get_balance(assetPublicKey) {
return await invoke("asset_wallets_get_balance", { assetPublicKey });
}

async function command_asset_wallets_get_unspent_amounts() {
return await invoke("asset_wallets_get_unspent_amounts", {});
}

const commands = {
command_create_db,
command_assets_create,
Expand All @@ -147,6 +151,7 @@ const commands = {
command_next_asset_public_key,
command_asset_wallets_create,
command_asset_wallets_get_balance,
command_asset_wallets_get_unspent_amounts,
command_asset_wallets_list,
command_asset_wallets_get_latest_address,
command_asset_wallets_create_address,
Expand Down
20 changes: 20 additions & 0 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tari_app_grpc::{
GetOwnedAssetsResponse,
GetTransactionInfoRequest,
GetTransactionInfoResponse,
GetUnspentAmountsResponse,
GetVersionRequest,
GetVersionResponse,
ImportUtxosRequest,
Expand Down Expand Up @@ -163,6 +164,25 @@ impl wallet_server::Wallet for WalletGrpcServer {
}))
}

async fn get_unspent_amounts(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<GetUnspentAmountsResponse>, Status> {
let mut output_service = self.get_output_manager_service();
let unspent_amounts;
match output_service.get_unspent_outputs().await {
Ok(uo) => unspent_amounts = uo,
Err(e) => return Err(Status::not_found(format!("GetUnspentAmounts error! {}", e))),
}
Ok(Response::new(GetUnspentAmountsResponse {
amount: unspent_amounts
.into_iter()
.map(|o| o.value.as_u64())
.filter(|&a| a > 0)
.collect(),
}))
}

async fn revalidate_all_transactions(
&self,
_request: Request<RevalidateRequest>,
Expand Down
141 changes: 60 additions & 81 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, fs::File, io::BufReader, path::Path, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::future::try_join_all;
use log::*;
use log::info;
use tari_common::{configuration::ValidatorNodeConfig, GlobalConfig};
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_crypto::tari_utilities::hex::Hex;
use tari_dan_core::{
models::{AssetDefinition, Committee},
services::{
BaseNodeClient,
ConcreteAssetProcessor,
ConcreteCheckpointManager,
ConcreteCommitteeManager,
Expand All @@ -46,7 +46,7 @@ use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tokio::task;
use tokio::{task, time};

use crate::{
default_service_specification::DefaultServiceSpecification,
Expand All @@ -58,7 +58,7 @@ use crate::{
ExitCodes,
};

const LOG_TARGET: &str = "tari::dan::dan_node";
const LOG_TARGET: &str = "tari::validator_node::app";

pub struct DanNode {
config: GlobalConfig,
Expand All @@ -84,84 +84,63 @@ impl DanNode {
.as_ref()
.ok_or_else(|| ExitCodes::ConfigError("Missing dan section".to_string()))?;

let asset_definitions = self.read_asset_definitions(&dan_config.asset_config_directory)?;
if asset_definitions.is_empty() {
warn!(
target: LOG_TARGET,
"No assets to process. Add assets by putting definitions in the `assets` folder with a `.asset` \
extension."
);
}

let mut tasks = vec![];
for asset in asset_definitions {
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();

tasks.push(task::spawn(async move {
DanNode::start_asset_worker(
asset,
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}));
}

if tasks.is_empty() {
// If there are no assets to process, work in proxy mode
tasks.push(task::spawn(DanNode::wait_for_exit()));
}
try_join_all(tasks)
.await
.map_err(|err| ExitCodes::UnknownError(format!("Join error occurred. {}", err)))?
.into_iter()
.collect::<Result<_, _>>()?;

Ok(())
}

fn read_asset_definitions(&self, path: &Path) -> Result<Vec<AssetDefinition>, ExitCodes> {
if !path.exists() {
fs::create_dir_all(path).expect("Could not create dir");
}
let paths = fs::read_dir(path).expect("Could not read asset definitions");

let mut result = vec![];
for path in paths {
let path = path.expect("Not a valid file").path();

if !path.is_dir() && path.extension().unwrap_or_default() == "asset" {
let file = File::open(path).expect("could not open file");
let reader = BufReader::new(file);

let def: AssetDefinition = serde_json::from_reader(reader).expect("lol not a valid json");
result.push(def);
}
}
Ok(result)
}

async fn wait_for_exit() -> Result<(), ExitCodes> {
println!("Type `exit` to exit");
let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address);
let mut tasks = HashMap::new();
let mut next_scanned_height = 0u64;
loop {
let mut line = String::new();
let _ = std::io::stdin().read_line(&mut line).expect("Failed to read line");
if line.to_lowercase().trim() == "exit" {
return Err(ExitCodes::UnknownError("User cancelled".to_string()));
} else {
println!("Type `exit` to exit");
let tip = base_node_client.get_tip_info().await.unwrap();
if tip.height_of_longest_chain >= next_scanned_height {
info!(
target: LOG_TARGET,
"Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain
);
if dan_config.scan_for_assets {
next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval;
info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height);
} else {
next_scanned_height = u64::MAX; // Never run again.
}

let assets = base_node_client
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.unwrap();
for asset in assets {
if tasks.contains_key(&asset.public_key) {
continue;
}
if let Some(allow_list) = &dan_config.assets_allow_list {
if !allow_list.contains(&asset.public_key.to_hex()) {
continue;
}
}
info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key);
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();
tasks.insert(
asset.public_key.clone(),
task::spawn(async move {
DanNode::start_asset_worker(
asset.clone(),
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}),
);
}
}
time::sleep(Duration::from_secs(120)).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{BaseLayerMetadata, BaseLayerOutput},
models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput},
services::BaseNodeClient,
DigitalAssetError,
};
Expand Down Expand Up @@ -100,4 +100,48 @@ impl BaseNodeClient for GrpcBaseNodeClient {
.transpose()?;
Ok(output)
}

async fn get_assets_for_dan_node(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<AssetDefinition>, DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 };
let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner();
let mut assets: Vec<AssetDefinition> = vec![];
let tip = self.get_tip_info().await?;
while let Some(r) = result.message().await.unwrap() {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) {
if let Some(checkpoint) = self
.get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32])
.await?
{
if let Some(committee) = checkpoint.get_side_chain_committee() {
if committee.contains(&dan_node_public_key) {
assets.push(AssetDefinition {
public_key: asset_public_key,
template_parameters: r
.features
.unwrap()
.asset
.unwrap()
.template_parameters
.into_iter()
.map(|tp| tp.into())
.collect(),
..Default::default()
});
}
}
}
}
}
Ok(assets)
}
}
7 changes: 7 additions & 0 deletions common/config/presets/validator_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@
committee = ["2ea0df3059caf4411624d6bf5b9c02238d607d2798c586b3e6c2a054da3f205a"] # cannot be of zero size
phase_timeout = 30
template_id = "EditableMetadata"

# If set to false, there will be no scanning at all.
scan_for_assets = true
# How often do we want to scan the base layer for changes.
new_asset_scanning_interval = 10
# If set then only the specific assets will be checked.
# assets_allow_list = ["<pubkey>"]
3 changes: 3 additions & 0 deletions common/src/configuration/validator_node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct ValidatorNodeConfig {
pub base_node_grpc_address: SocketAddr,
#[serde(default = "default_wallet_grpc_address")]
pub wallet_grpc_address: SocketAddr,
pub scan_for_assets: bool,
pub new_asset_scanning_interval: u64,
pub assets_allow_list: Option<Vec<String>>,
}

fn default_asset_config_directory() -> PathBuf {
Expand Down
Loading