From f342714ed1ca4f8451a36e83d36fcf1214af58c4 Mon Sep 17 00:00:00 2001 From: brianp Date: Tue, 17 May 2022 12:24:53 +0200 Subject: [PATCH 1/8] WIP --- applications/tari_validator_node/src/config.rs | 2 ++ common/config/presets/validator_node.toml | 1 + 2 files changed, 3 insertions(+) diff --git a/applications/tari_validator_node/src/config.rs b/applications/tari_validator_node/src/config.rs index 8169a10ca5..35a8e8c8fe 100644 --- a/applications/tari_validator_node/src/config.rs +++ b/applications/tari_validator_node/src/config.rs @@ -65,6 +65,7 @@ pub struct ValidatorNodeConfig { pub assets_allow_list: Option>, pub data_dir: PathBuf, pub p2p: P2pConfig, + pub committee_auto_accept: bool, pub committee_management_polling_interval: u64, pub committee_management_confirmation_time: u64, pub grpc_address: Option, @@ -101,6 +102,7 @@ impl Default for ValidatorNodeConfig { new_asset_scanning_interval: 10, assets_allow_list: None, data_dir: PathBuf::from("/data/validator_node"), + committee_auto_accept: false, committee_management_confirmation_time: 10, committee_management_polling_interval: 5, p2p, diff --git a/common/config/presets/validator_node.toml b/common/config/presets/validator_node.toml index 4f5950e5cd..6e43befb7b 100644 --- a/common/config/presets/validator_node.toml +++ b/common/config/presets/validator_node.toml @@ -15,5 +15,6 @@ new_asset_scanning_interval = 10 # If set then only the specific assets will be checked. # assets_allow_list = [""] +committee_auto_accept = false committee_management_polling_interval = 5 committee_management_confirmation_time = 20 \ No newline at end of file From 8da4dd5e7bca4bee591862cfbbc2f862f0d2f708 Mon Sep 17 00:00:00 2001 From: brianp Date: Wed, 8 Jun 2022 23:39:34 +0200 Subject: [PATCH 2/8] Update naming --- applications/tari_app_grpc/proto/validator_node.proto | 4 ++-- .../src/grpc/services/base_node_client.rs | 2 +- .../src/grpc/validator_node_grpc_server.rs | 8 ++++---- dan_layer/core/src/services/base_node_client.rs | 2 +- dan_layer/core/src/services/mocks/mod.rs | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/applications/tari_app_grpc/proto/validator_node.proto b/applications/tari_app_grpc/proto/validator_node.proto index 493b59ecc5..2725a55ed3 100644 --- a/applications/tari_app_grpc/proto/validator_node.proto +++ b/applications/tari_app_grpc/proto/validator_node.proto @@ -31,11 +31,11 @@ service ValidatorNode { // rpc ExecuteInstruction(ExecuteInstructionRequest) returns (ExecuteInstructionResponse); rpc InvokeReadMethod(InvokeReadMethodRequest) returns (InvokeReadMethodResponse); rpc InvokeMethod(InvokeMethodRequest) returns (InvokeMethodResponse); - rpc GetCommitteeRequests(GetCommitteeRequestsRequest) returns (stream TransactionOutput); + rpc GetConstitutionRequests(GetConstitutionRequestsRequest) returns (stream TransactionOutput); rpc PublishContractAcceptance(PublishContractAcceptanceRequest) returns (PublishContractAcceptanceResponse); } -message GetCommitteeRequestsRequest { +message GetConstitutionRequestsRequest { // empty } diff --git a/applications/tari_validator_node/src/grpc/services/base_node_client.rs b/applications/tari_validator_node/src/grpc/services/base_node_client.rs index 7645042e31..803eb06c55 100644 --- a/applications/tari_validator_node/src/grpc/services/base_node_client.rs +++ b/applications/tari_validator_node/src/grpc/services/base_node_client.rs @@ -100,7 +100,7 @@ impl BaseNodeClient for GrpcBaseNodeClient { Ok(output) } - async fn check_for_constitutions_for_me( + async fn get_constitutions( &mut self, dan_node_public_key: PublicKey, ) -> Result, DigitalAssetError> { diff --git a/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs b/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs index 7cc660b7cc..aa3fc04d7a 100644 --- a/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs +++ b/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs @@ -69,7 +69,7 @@ impl ValidatorNodeGrpcServer rpc::validator_node_server::ValidatorNode for ValidatorNodeGrpcServer { - type GetCommitteeRequestsStream = mpsc::Receiver>; + type GetConstitutionRequestsStream = mpsc::Receiver>; async fn publish_contract_acceptance( &self, @@ -96,10 +96,10 @@ impl rpc::validator_node_ } } - async fn get_committee_requests( + async fn get_constitution_requests( &self, - _request: tonic::Request, - ) -> Result, tonic::Status> { + _request: tonic::Request, + ) -> Result, tonic::Status> { let (mut _sender, receiver) = mpsc::channel(100); task::spawn(async move { let mut _test = 1u64; diff --git a/dan_layer/core/src/services/base_node_client.rs b/dan_layer/core/src/services/base_node_client.rs index ba00c94f7d..85eec182d4 100644 --- a/dan_layer/core/src/services/base_node_client.rs +++ b/dan_layer/core/src/services/base_node_client.rs @@ -40,7 +40,7 @@ pub trait BaseNodeClient: Send + Sync { checkpoint_unique_id: Vec, ) -> Result, DigitalAssetError>; - async fn check_for_constitutions_for_me( + async fn get_constitutions( &mut self, dan_node_public_key: PublicKey, ) -> Result, DigitalAssetError>; diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index 36a7a81c38..cc6b214d1c 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -221,7 +221,7 @@ impl BaseNodeClient for MockBaseNodeClient { todo!() } - async fn check_for_constitutions_for_me( + async fn get_constitutions( &mut self, _dan_node_public_key: PublicKey, ) -> Result, DigitalAssetError> { From f2341e11ed0e44fabe7dd0dfc9c4471d60aa87ff Mon Sep 17 00:00:00 2001 From: brianp Date: Wed, 8 Jun 2022 23:40:18 +0200 Subject: [PATCH 3/8] Silence dead code warnings --- applications/tari_validator_node/src/asset.rs | 2 ++ applications/tari_validator_node/src/monitoring.rs | 2 ++ .../src/p2p/services/inbound_connection_service.rs | 3 +++ .../src/p2p/services/outbound_connection_service.rs | 1 + 4 files changed, 8 insertions(+) diff --git a/applications/tari_validator_node/src/asset.rs b/applications/tari_validator_node/src/asset.rs index e88195ef69..096d5977a1 100644 --- a/applications/tari_validator_node/src/asset.rs +++ b/applications/tari_validator_node/src/asset.rs @@ -31,6 +31,7 @@ use std::{ use tari_dan_core::models::AssetDefinition; #[derive(Debug)] +#[allow(dead_code)] pub struct Asset { definition: AssetDefinition, current_state: bool, @@ -40,6 +41,7 @@ pub struct Asset { kill_signal: Option>, } +#[allow(dead_code)] impl Asset { pub fn new(definition: AssetDefinition) -> Self { Self { diff --git a/applications/tari_validator_node/src/monitoring.rs b/applications/tari_validator_node/src/monitoring.rs index aef8656953..04ae6175a4 100644 --- a/applications/tari_validator_node/src/monitoring.rs +++ b/applications/tari_validator_node/src/monitoring.rs @@ -31,11 +31,13 @@ use tari_dan_core::models::AssetDefinition; use crate::asset::Asset; #[derive(Debug)] +#[allow(dead_code)] pub struct Monitoring { committee_management_confirmation_time: u64, assets: HashMap, } +#[allow(dead_code)] impl Monitoring { pub fn new(committee_management_confirmation_time: u64) -> Self { Self { diff --git a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs index 95e864c0b1..d2153b43a2 100644 --- a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs @@ -54,6 +54,7 @@ enum WaitForMessageType { } #[derive(Debug)] +#[allow(dead_code)] enum TariCommsInboundRequest { WaitForMessage { wait_for_type: WaitForMessageType, @@ -63,6 +64,7 @@ enum TariCommsInboundRequest { }, } +#[allow(dead_code)] pub struct TariCommsInboundConnectionService { receiver: TariCommsInboundReceiverHandle, // sender: Sender<(CommsPublicKey, HotStuffMessage)>, @@ -81,6 +83,7 @@ pub struct TariCommsInboundConnectionService { loopback_receiver: Receiver<(CommsPublicKey, HotStuffMessage)>, } +#[allow(dead_code)] impl TariCommsInboundConnectionService { pub fn new(asset_public_key: PublicKey) -> Self { let (sender, receiver) = channel(1000); diff --git a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs index e9aad49839..f19dfc779d 100644 --- a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs @@ -48,6 +48,7 @@ pub struct TariCommsOutboundService { } impl TariCommsOutboundService { + #[allow(dead_code)] pub fn new( outbound_message_requester: OutboundMessageRequester, loopback_service: Sender<(CommsPublicKey, HotStuffMessage)>, From 7fa0f9b55a46e9698ea888fc070183ef39c12686 Mon Sep 17 00:00:00 2001 From: brianp Date: Wed, 8 Jun 2022 23:40:52 +0200 Subject: [PATCH 4/8] Add new settings --- applications/tari_validator_node/src/config.rs | 12 ++++++------ common/config/presets/validator_node.toml | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/applications/tari_validator_node/src/config.rs b/applications/tari_validator_node/src/config.rs index 35a8e8c8fe..2607b4cd4c 100644 --- a/applications/tari_validator_node/src/config.rs +++ b/applications/tari_validator_node/src/config.rs @@ -65,9 +65,9 @@ pub struct ValidatorNodeConfig { pub assets_allow_list: Option>, pub data_dir: PathBuf, pub p2p: P2pConfig, - pub committee_auto_accept: bool, - pub committee_management_polling_interval: u64, - pub committee_management_confirmation_time: u64, + pub constitution_auto_accept: bool, + pub constitution_management_polling_interval: u64, + pub constitution_management_confirmation_time: u64, pub grpc_address: Option, } @@ -102,9 +102,9 @@ impl Default for ValidatorNodeConfig { new_asset_scanning_interval: 10, assets_allow_list: None, data_dir: PathBuf::from("/data/validator_node"), - committee_auto_accept: false, - committee_management_confirmation_time: 10, - committee_management_polling_interval: 5, + constitution_auto_accept: false, + constitution_management_confirmation_time: 10, + constitution_management_polling_interval: 5, p2p, grpc_address: Some("/ip4/127.0.0.1/tcp/18144".parse().unwrap()), } diff --git a/common/config/presets/validator_node.toml b/common/config/presets/validator_node.toml index 6e43befb7b..a2787fd467 100644 --- a/common/config/presets/validator_node.toml +++ b/common/config/presets/validator_node.toml @@ -15,6 +15,6 @@ new_asset_scanning_interval = 10 # If set then only the specific assets will be checked. # assets_allow_list = [""] -committee_auto_accept = false -committee_management_polling_interval = 5 -committee_management_confirmation_time = 20 \ No newline at end of file +constitution_auto_accept = false +constitution_management_polling_interval = 5 +constitution_management_confirmation_time = 20 \ No newline at end of file From 1554f54a1eb6ee5e49bd37542a8c1a894d174727 Mon Sep 17 00:00:00 2001 From: brianp Date: Wed, 8 Jun 2022 23:42:19 +0200 Subject: [PATCH 5/8] change to info lines --- applications/tari_validator_node/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/applications/tari_validator_node/src/main.rs b/applications/tari_validator_node/src/main.rs index 7f2f05bde1..682e413ea4 100644 --- a/applications/tari_validator_node/src/main.rs +++ b/applications/tari_validator_node/src/main.rs @@ -150,8 +150,8 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> { if let Some(address) = config.validator_node.grpc_address.clone() { task::spawn(run_grpc(grpc_server, address, shutdown.to_signal())); } - println!("🚀 Validator node started!"); - println!("{}", node_identity); + info!("🚀 Validator node started!"); + info!("{}", node_identity); run_dan_node( shutdown.to_signal(), config.validator_node.clone(), @@ -199,7 +199,7 @@ async fn run_grpc( grpc_address: Multiaddr, shutdown_signal: ShutdownSignal, ) -> Result<(), anyhow::Error> { - println!("Starting GRPC on {}", grpc_address); + info!("Starting GRPC on {}", grpc_address); info!(target: LOG_TARGET, "Starting GRPC on {}", grpc_address); let grpc_address = multiaddr_to_socketaddr(&grpc_address)?; @@ -213,7 +213,7 @@ async fn run_grpc( err })?; - println!("Stopping GRPC"); + info!("Stopping GRPC"); info!(target: LOG_TARGET, "Stopping GRPC"); Ok(()) } From 4b0219aef26b3036376f6ce466c5fd03dd07455d Mon Sep 17 00:00:00 2001 From: brianp Date: Wed, 8 Jun 2022 23:43:14 +0200 Subject: [PATCH 6/8] Simplify as much as possible and implement auto acceptance --- .../tari_validator_node/src/dan_node.rs | 271 +++++------------- applications/tari_validator_node/src/main.rs | 37 +-- 2 files changed, 71 insertions(+), 237 deletions(-) diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 770edde021..4927c027c4 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -20,238 +20,99 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{ - sync::{atomic::AtomicBool, Arc}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use log::*; +use log::{error, info}; use tari_common::exit_codes::{ExitCode, ExitError}; -use tari_common_types::types::PublicKey; -use tari_comms::{types::CommsPublicKey, NodeIdentity}; -use tari_comms_dht::Dht; -use tari_crypto::tari_utilities::hex::Hex; -use tari_dan_core::{ - models::{AssetDefinition, Committee}, - services::{ - BaseNodeClient, - ConcreteAssetProcessor, - ConcreteCheckpointManager, - ConcreteCommitteeManager, - LoggingEventsPublisher, - MempoolServiceHandle, - NodeIdentitySigningService, - TariDanPayloadProcessor, - TariDanPayloadProvider, - }, - workers::ConsensusWorker, -}; -use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService}; -use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType}; -use tari_service_framework::ServiceHandles; -use tari_shutdown::ShutdownSignal; +use tari_common_types::types::Signature; +use tari_comms::NodeIdentity; +use tari_dan_core::services::{BaseNodeClient, WalletClient}; use tokio::{task, time}; use crate::{ config::ValidatorNodeConfig, - default_service_specification::DefaultServiceSpecification, grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient}, - monitoring::Monitoring, - p2p::services::{ - inbound_connection_service::TariCommsInboundConnectionService, - outbound_connection_service::TariCommsOutboundService, - }, - TariCommsValidatorNodeClientFactory, }; -const LOG_TARGET: &str = "tari::validator_node::app"; +const _LOG_TARGET: &str = "tari::validator_node::app"; +#[derive(Clone)] pub struct DanNode { config: ValidatorNodeConfig, + identity: Arc, } impl DanNode { - pub fn new(config: ValidatorNodeConfig) -> Self { - Self { config } + pub fn new(config: ValidatorNodeConfig, identity: Arc) -> Self { + Self { config, identity } } - pub async fn start( - &self, - shutdown: ShutdownSignal, - node_identity: Arc, - mempool_service: MempoolServiceHandle, - db_factory: SqliteDbFactory, - handles: ServiceHandles, - subscription_factory: SubscriptionFactory, - ) -> Result<(), ExitError> { + pub async fn start(&self) -> Result<(), ExitError> { let mut base_node_client = GrpcBaseNodeClient::new(self.config.base_node_grpc_address); - let mut next_scanned_height = 0u64; let mut last_tip = 0u64; - let mut monitoring = Monitoring::new(self.config.committee_management_confirmation_time); - loop { - let tip = base_node_client - .get_tip_info() - .await - .map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?; - if tip.height_of_longest_chain >= next_scanned_height { - info!( - target: LOG_TARGET, - "Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain - ); - if self.config.scan_for_assets { - next_scanned_height = - tip.height_of_longest_chain + self.config.committee_management_polling_interval; - info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height); - } else { - next_scanned_height = u64::MAX; // Never run again. - } - let mut assets = base_node_client - .get_assets_for_dan_node(node_identity.public_key().clone()) - .await - .map_err(|e| ExitError::new(ExitCode::DigitalAssetError, e))?; - info!( - target: LOG_TARGET, - "Base node returned {} asset(s) to process", - assets.len() - ); - if let Some(allow_list) = &self.config.assets_allow_list { - assets.retain(|(asset, _)| allow_list.contains(&asset.public_key.to_hex())); - } - for (asset, mined_height) in assets.clone() { - monitoring.add_if_unmonitored(asset.clone()); - monitoring.add_state(asset.public_key, mined_height, true); - } - let mut known_active_public_keys = assets.into_iter().map(|(asset, _)| asset.public_key); - let active_public_keys = monitoring - .get_active_public_keys() - .into_iter() - .cloned() - .collect::>(); - for public_key in active_public_keys { - if !known_active_public_keys.any(|pk| pk == public_key) { - // Active asset is not part of the newly known active assets, maybe there were no checkpoint for - // the asset. Are we still part of the committee? - if let (false, height) = base_node_client - .check_if_in_committee(public_key.clone(), node_identity.public_key().clone()) - .await - .unwrap() - { - // We are not part of the latest committee, set the state to false - monitoring.add_state(public_key.clone(), height, false) - } - } - } - } - if tip.height_of_longest_chain > last_tip { - last_tip = tip.height_of_longest_chain; - monitoring.update_height(last_tip, |asset| { - let node_identity = node_identity.as_ref().clone(); - let mempool = mempool_service.clone(); - let handles = handles.clone(); - let subscription_factory = subscription_factory.clone(); - let shutdown = shutdown.clone(); - // Create a kill signal for each asset - let kill = Arc::new(AtomicBool::new(false)); - let dan_config = self.config.clone(); - let db_factory = db_factory.clone(); - task::spawn(DanNode::start_asset_worker( - asset, - node_identity, - mempool, - handles, - subscription_factory, - shutdown, - dan_config, - db_factory, - kill.clone(), - )); - kill - }); - } - time::sleep(Duration::from_secs(120)).await; - } - } + let node = self.clone(); - pub async fn start_asset_worker( - asset_definition: AssetDefinition, - node_identity: NodeIdentity, - mempool_service: MempoolServiceHandle, - handles: ServiceHandles, - subscription_factory: SubscriptionFactory, - shutdown: ShutdownSignal, - config: ValidatorNodeConfig, - db_factory: SqliteDbFactory, - kill: Arc, - ) -> Result<(), ExitError> { - let timeout = Duration::from_secs(asset_definition.phase_timeout); - let committee = asset_definition - .committee - .iter() - .map(|s| { - CommsPublicKey::from_hex(s) - .map_err(|e| ExitError::new(ExitCode::ConfigError, format!("could not convert to hex:{}", e))) - }) - .collect::, _>>()?; - - let committee = Committee::new(committee); - let committee_service = ConcreteCommitteeManager::new(committee); - - let payload_provider = TariDanPayloadProvider::new(mempool_service.clone()); + if self.config.constitution_auto_accept { + task::spawn(async move { + loop { + if let Ok(metadata) = base_node_client.get_tip_info().await { + last_tip = metadata.height_of_longest_chain; + } - let events_publisher = LoggingEventsPublisher::default(); - let signing_service = NodeIdentitySigningService::new(node_identity.clone()); + match node + .find_and_accept_constitutions(base_node_client.clone(), last_tip) + .await + { + Ok(()) => info!("Contracts accepted"), + Err(e) => error!("Contracts not accepted becayse {:?}", e), + } - // let _backend = LmdbAssetStore::initialize(data_dir.join("asset_data"), Default::default()) - // .map_err(|err| ExitCodes::DatabaseError(err.to_string()))?; - // let data_store = AssetDataStore::new(backend); - let asset_processor = ConcreteAssetProcessor::default(); + time::sleep(Duration::from_secs( + node.config.constitution_management_polling_interval, + )) + .await; + } + }); + } - let payload_processor = TariDanPayloadProcessor::new(asset_processor); - let mut inbound = TariCommsInboundConnectionService::new(asset_definition.public_key.clone()); - let receiver = inbound.get_receiver(); + // loop { + // For other work + //} - let loopback = inbound.clone_sender(); - let shutdown_2 = shutdown.clone(); - task::spawn(async move { - let topic_subscription = - subscription_factory.get_subscription(TariMessageType::DanConsensusMessage, "HotStuffMessages"); - inbound.run(shutdown_2, topic_subscription).await - }); - let dht = handles.expect_handle::(); - let outbound = - TariCommsOutboundService::new(dht.outbound_requester(), loopback, asset_definition.public_key.clone()); - let base_node_client = GrpcBaseNodeClient::new(config.base_node_grpc_address); - let chain_storage = SqliteStorageService {}; - let wallet_client = GrpcWalletClient::new(config.wallet_grpc_address); - let checkpoint_manager = ConcreteCheckpointManager::new(asset_definition.clone(), wallet_client); - let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(dht.dht_requester()); - let mut consensus_worker = ConsensusWorker::::new( - receiver, - outbound, - committee_service, - node_identity.public_key().clone(), - payload_provider, - events_publisher, - signing_service, - payload_processor, - asset_definition, - base_node_client, - timeout, - db_factory, - chain_storage, - checkpoint_manager, - validator_node_client_factory, - ); + Ok(()) + } - if let Err(err) = consensus_worker.run(shutdown.clone(), None, kill).await { - error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err); - return Err(ExitError::new(ExitCode::UnknownError, err)); + async fn find_and_accept_constitutions( + &self, + mut base_node_client: GrpcBaseNodeClient, + last_tip: u64, + ) -> Result<(), ExitError> { + let mut wallet_client = GrpcWalletClient::new(self.config.wallet_grpc_address); + + let outputs = base_node_client + .get_constitutions(self.identity.public_key().clone()) + .await + .map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?; + + for output in outputs { + if let Some(sidechain_features) = output.features.sidechain_features { + let contract_id = sidechain_features.contract_id; + let constitution = sidechain_features.constitution.expect("Constitution wasn't present"); + + if constitution.acceptance_requirements.acceptance_period_expiry < last_tip { + let signature = Signature::default(); + + match wallet_client + .submit_contract_acceptance(&contract_id, self.identity.public_key(), &signature) + .await + { + Ok(tx_id) => info!("Accepted with id={}", tx_id), + Err(_) => error!("Did not accept the contract acceptance"), + }; + }; + } } Ok(()) } - - // async fn start_asset_proxy(&self) -> Result<(), ExitCodes> { - // todo!() - // } } diff --git a/applications/tari_validator_node/src/main.rs b/applications/tari_validator_node/src/main.rs index 682e413ea4..19df452a75 100644 --- a/applications/tari_validator_node/src/main.rs +++ b/applications/tari_validator_node/src/main.rs @@ -52,8 +52,6 @@ use tari_comms::{ use tari_comms_dht::Dht; use tari_dan_core::services::{ConcreteAssetProcessor, ConcreteAssetProxy, MempoolServiceHandle, ServiceSpecification}; use tari_dan_storage_sqlite::SqliteDbFactory; -use tari_p2p::comms_connector::SubscriptionFactory; -use tari_service_framework::ServiceHandles; use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::{runtime, runtime::Runtime, task}; use tonic::transport::Server; @@ -118,7 +116,7 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> { node_identity.node_id() ); // fs::create_dir_all(&global.peer_db_path).map_err(|err| ExitError::new(ExitCode::ConfigError, err))?; - let (handles, subscription_factory) = comms::build_service_and_comms_stack( + let (handles, _subscription_factory) = comms::build_service_and_comms_stack( config, shutdown.to_signal(), node_identity.clone(), @@ -152,16 +150,7 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> { } info!("🚀 Validator node started!"); info!("{}", node_identity); - run_dan_node( - shutdown.to_signal(), - config.validator_node.clone(), - mempool_service, - db_factory, - handles, - subscription_factory, - node_identity, - ) - .await?; + run_dan_node(config.validator_node.clone(), node_identity).await?; Ok(()) } @@ -173,25 +162,9 @@ fn build_runtime() -> Result { .map_err(|e| ExitError::new(ExitCode::UnknownError, e)) } -async fn run_dan_node( - shutdown_signal: ShutdownSignal, - config: ValidatorNodeConfig, - mempool_service: MempoolServiceHandle, - db_factory: SqliteDbFactory, - handles: ServiceHandles, - subscription_factory: SubscriptionFactory, - node_identity: Arc, -) -> Result<(), ExitError> { - let node = DanNode::new(config); - node.start( - shutdown_signal, - node_identity, - mempool_service, - db_factory, - handles, - subscription_factory, - ) - .await +async fn run_dan_node(config: ValidatorNodeConfig, node_identity: Arc) -> Result<(), ExitError> { + let node = DanNode::new(config, node_identity); + node.start().await } async fn run_grpc( From c8dc7a368940f404489d059319469eb923b22f9b Mon Sep 17 00:00:00 2001 From: brianp Date: Thu, 9 Jun 2022 10:25:01 +0200 Subject: [PATCH 7/8] Make sure the dan node stays running --- applications/tari_validator_node/src/dan_node.rs | 8 ++++---- applications/tari_validator_node/src/main.rs | 9 ++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 4927c027c4..6f8076ffab 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -75,11 +75,11 @@ impl DanNode { }); } - // loop { - // For other work - //} + loop { + // other work here - Ok(()) + time::sleep(Duration::from_secs(120)).await; + } } async fn find_and_accept_constitutions( diff --git a/applications/tari_validator_node/src/main.rs b/applications/tari_validator_node/src/main.rs index 19df452a75..268ff130a5 100644 --- a/applications/tari_validator_node/src/main.rs +++ b/applications/tari_validator_node/src/main.rs @@ -148,9 +148,12 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> { if let Some(address) = config.validator_node.grpc_address.clone() { task::spawn(run_grpc(grpc_server, address, shutdown.to_signal())); } - info!("🚀 Validator node started!"); - info!("{}", node_identity); - run_dan_node(config.validator_node.clone(), node_identity).await?; + + println!("🚀 Validator node started!"); + println!("{}", node_identity); + + run_dan_node(config.validator_node.clone(), node_identity.clone()).await?; + Ok(()) } From 44c36eadcd46e38e35ca6620ae7f73a1fa686133 Mon Sep 17 00:00:00 2001 From: brianp Date: Thu, 9 Jun 2022 11:12:16 +0200 Subject: [PATCH 8/8] Add a failing spec for auto acceptance We need to work out some other kinks before this will pass but we do want this spec in the future. --- integration_tests/features/ValidatorNode.feature | 15 +++++++++++++-- .../features/support/validator_node_steps.js | 6 ++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/integration_tests/features/ValidatorNode.feature b/integration_tests/features/ValidatorNode.feature index 85a7dfcfe8..64f766058f 100644 --- a/integration_tests/features/ValidatorNode.feature +++ b/integration_tests/features/ValidatorNode.feature @@ -21,13 +21,24 @@ Feature: Validator Node And I create 40 NFTs And I mine 3 blocks - @broken + @dan @broken Scenario: Publish contract acceptance Given I have a seed node NODE1 And I have wallet WALLET1 connected to all seed nodes When I mine 9 blocks using wallet WALLET1 on NODE1 Then I wait for wallet WALLET1 to have at least 1000000 uT - And I have a validator node VN1 connected to base node NODE1 and wallet WALLET1 + And I have a validator node VN1 connected to base node NODE1 and wallet WALLET1 with "constitiution_auto_accept" set to "false" Then I publish a contract acceptance transaction for the validator node VN1 When I mine 4 blocks using wallet WALLET1 on NODE1 Then wallet WALLET1 has at least 1 transactions that are all TRANSACTION_STATUS_MINED_CONFIRMED and not cancelled + + @dan @broken + Scenario: Contract auto acceptance + Given I have a seed node NODE1 + And I have wallet WALLET1 connected to all seed nodes + When I mine 9 blocks using wallet WALLET1 on NODE1 + Then I wait for wallet WALLET1 to have at least 1000000 uT + And I have a validator node VN1 connected to base node NODE1 and wallet WALLET1 with "constitution_auto_accept" set to "true" + Then I create a "constitution-definition" from file "fixtures/constitution_definition.json" on wallet WALLET1 via command line + When I mine 8 blocks using wallet WALLET1 on NODE1 + Then wallet WALLET1 has at least 2 transactions that are all TRANSACTION_STATUS_MINED_CONFIRMED and not cancelled diff --git a/integration_tests/features/support/validator_node_steps.js b/integration_tests/features/support/validator_node_steps.js index 4f05f298b8..2a64560dff 100644 --- a/integration_tests/features/support/validator_node_steps.js +++ b/integration_tests/features/support/validator_node_steps.js @@ -123,9 +123,9 @@ Then( ); Given( - "I have a validator node {word} connected to base node {word} and wallet {word}", + "I have a validator node {word} connected to base node {word} and wallet {word} with {word} set to {word}", { timeout: 20 * 1000 }, - async function (vn_name, base_node_name, wallet_name) { + async function (vn_name, base_node_name, wallet_name, option_key, option_value) { const baseNode = this.getNode(base_node_name); const walletNode = this.getWallet(wallet_name); @@ -133,6 +133,8 @@ Given( const walletGrpcAddress = `127.0.0.1:${walletNode.getGrpcPort()}`; const options = {}; + options[option_key] = option_value; + const danNode = new ValidatorNodeProcess( vn_name, false,