Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: contract auto acceptance #4177

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions applications/tari_app_grpc/proto/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ service ValidatorNode {
// rpc ExecuteInstruction(ExecuteInstructionRequest) returns (ExecuteInstructionResponse);
rpc InvokeReadMethod(InvokeReadMethodRequest) returns (InvokeReadMethodResponse);
rpc InvokeMethod(InvokeMethodRequest) returns (InvokeMethodResponse);
rpc GetCommitteeRequests(GetCommitteeRequestsRequest) returns (stream TransactionOutput);
rpc GetConstitutionRequests(GetConstitutionRequestsRequest) returns (stream TransactionOutput);
rpc PublishContractAcceptance(PublishContractAcceptanceRequest) returns (PublishContractAcceptanceResponse);
}

message GetCommitteeRequestsRequest {
message GetConstitutionRequestsRequest {
// empty
}

Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
use tari_dan_core::models::AssetDefinition;

#[derive(Debug)]
#[allow(dead_code)]
pub struct Asset {
definition: AssetDefinition,
current_state: bool,
Expand All @@ -40,6 +41,7 @@ pub struct Asset {
kill_signal: Option<Arc<AtomicBool>>,
}

#[allow(dead_code)]
impl Asset {
pub fn new(definition: AssetDefinition) -> Self {
Self {
Expand Down
10 changes: 6 additions & 4 deletions applications/tari_validator_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ pub struct ValidatorNodeConfig {
pub assets_allow_list: Option<Vec<String>>,
pub data_dir: PathBuf,
pub p2p: P2pConfig,
pub committee_management_polling_interval: u64,
pub committee_management_confirmation_time: u64,
pub constitution_auto_accept: bool,
pub constitution_management_polling_interval: u64,
pub constitution_management_confirmation_time: u64,
pub grpc_address: Option<Multiaddr>,
}

Expand Down Expand Up @@ -101,8 +102,9 @@ impl Default for ValidatorNodeConfig {
new_asset_scanning_interval: 10,
assets_allow_list: None,
data_dir: PathBuf::from("/data/validator_node"),
committee_management_confirmation_time: 10,
committee_management_polling_interval: 5,
constitution_auto_accept: false,
constitution_management_confirmation_time: 10,
constitution_management_polling_interval: 5,
p2p,
grpc_address: Some("/ip4/127.0.0.1/tcp/18144".parse().unwrap()),
}
Expand Down
269 changes: 65 additions & 204 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,238 +20,99 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use std::{sync::Arc, time::Duration};

use log::*;
use log::{error, info};
use tari_common::exit_codes::{ExitCode, ExitError};
use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_crypto::tari_utilities::hex::Hex;
use tari_dan_core::{
models::{AssetDefinition, Committee},
services::{
BaseNodeClient,
ConcreteAssetProcessor,
ConcreteCheckpointManager,
ConcreteCommitteeManager,
LoggingEventsPublisher,
MempoolServiceHandle,
NodeIdentitySigningService,
TariDanPayloadProcessor,
TariDanPayloadProvider,
},
workers::ConsensusWorker,
};
use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tari_common_types::types::Signature;
use tari_comms::NodeIdentity;
use tari_dan_core::services::{BaseNodeClient, WalletClient};
use tokio::{task, time};

use crate::{
config::ValidatorNodeConfig,
default_service_specification::DefaultServiceSpecification,
grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient},
monitoring::Monitoring,
p2p::services::{
inbound_connection_service::TariCommsInboundConnectionService,
outbound_connection_service::TariCommsOutboundService,
},
TariCommsValidatorNodeClientFactory,
};

const LOG_TARGET: &str = "tari::validator_node::app";
const _LOG_TARGET: &str = "tari::validator_node::app";

#[derive(Clone)]
pub struct DanNode {
config: ValidatorNodeConfig,
identity: Arc<NodeIdentity>,
}

impl DanNode {
pub fn new(config: ValidatorNodeConfig) -> Self {
Self { config }
pub fn new(config: ValidatorNodeConfig, identity: Arc<NodeIdentity>) -> Self {
Self { config, identity }
}

pub async fn start(
&self,
shutdown: ShutdownSignal,
node_identity: Arc<NodeIdentity>,
mempool_service: MempoolServiceHandle,
db_factory: SqliteDbFactory,
handles: ServiceHandles,
subscription_factory: SubscriptionFactory,
) -> Result<(), ExitError> {
pub async fn start(&self) -> Result<(), ExitError> {
let mut base_node_client = GrpcBaseNodeClient::new(self.config.base_node_grpc_address);
let mut next_scanned_height = 0u64;
let mut last_tip = 0u64;
let mut monitoring = Monitoring::new(self.config.committee_management_confirmation_time);
loop {
let tip = base_node_client
.get_tip_info()
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?;
if tip.height_of_longest_chain >= next_scanned_height {
info!(
target: LOG_TARGET,
"Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain
);
if self.config.scan_for_assets {
next_scanned_height =
tip.height_of_longest_chain + self.config.committee_management_polling_interval;
info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height);
} else {
next_scanned_height = u64::MAX; // Never run again.
}
let mut assets = base_node_client
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, e))?;
info!(
target: LOG_TARGET,
"Base node returned {} asset(s) to process",
assets.len()
);
if let Some(allow_list) = &self.config.assets_allow_list {
assets.retain(|(asset, _)| allow_list.contains(&asset.public_key.to_hex()));
}
for (asset, mined_height) in assets.clone() {
monitoring.add_if_unmonitored(asset.clone());
monitoring.add_state(asset.public_key, mined_height, true);
}
let mut known_active_public_keys = assets.into_iter().map(|(asset, _)| asset.public_key);
let active_public_keys = monitoring
.get_active_public_keys()
.into_iter()
.cloned()
.collect::<Vec<PublicKey>>();
for public_key in active_public_keys {
if !known_active_public_keys.any(|pk| pk == public_key) {
// Active asset is not part of the newly known active assets, maybe there were no checkpoint for
// the asset. Are we still part of the committee?
if let (false, height) = base_node_client
.check_if_in_committee(public_key.clone(), node_identity.public_key().clone())
.await
.unwrap()
{
// We are not part of the latest committee, set the state to false
monitoring.add_state(public_key.clone(), height, false)
}
let node = self.clone();

if self.config.constitution_auto_accept {
task::spawn(async move {
loop {
if let Ok(metadata) = base_node_client.get_tip_info().await {
last_tip = metadata.height_of_longest_chain;
}

match node
.find_and_accept_constitutions(base_node_client.clone(), last_tip)
.await
{
Ok(()) => info!("Contracts accepted"),
Err(e) => error!("Contracts not accepted becayse {:?}", e),
}

time::sleep(Duration::from_secs(
node.config.constitution_management_polling_interval,
))
.await;
}
}
if tip.height_of_longest_chain > last_tip {
last_tip = tip.height_of_longest_chain;
monitoring.update_height(last_tip, |asset| {
let node_identity = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
// Create a kill signal for each asset
let kill = Arc::new(AtomicBool::new(false));
let dan_config = self.config.clone();
let db_factory = db_factory.clone();
task::spawn(DanNode::start_asset_worker(
asset,
node_identity,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
kill.clone(),
));
kill
});
}
});
}

loop {
// other work here

time::sleep(Duration::from_secs(120)).await;
}
}

pub async fn start_asset_worker(
asset_definition: AssetDefinition,
node_identity: NodeIdentity,
mempool_service: MempoolServiceHandle,
handles: ServiceHandles,
subscription_factory: SubscriptionFactory,
shutdown: ShutdownSignal,
config: ValidatorNodeConfig,
db_factory: SqliteDbFactory,
kill: Arc<AtomicBool>,
async fn find_and_accept_constitutions(
&self,
mut base_node_client: GrpcBaseNodeClient,
last_tip: u64,
) -> Result<(), ExitError> {
let timeout = Duration::from_secs(asset_definition.phase_timeout);
let committee = asset_definition
.committee
.iter()
.map(|s| {
CommsPublicKey::from_hex(s)
.map_err(|e| ExitError::new(ExitCode::ConfigError, format!("could not convert to hex:{}", e)))
})
.collect::<Result<Vec<_>, _>>()?;

let committee = Committee::new(committee);
let committee_service = ConcreteCommitteeManager::new(committee);

let payload_provider = TariDanPayloadProvider::new(mempool_service.clone());

let events_publisher = LoggingEventsPublisher::default();
let signing_service = NodeIdentitySigningService::new(node_identity.clone());

// let _backend = LmdbAssetStore::initialize(data_dir.join("asset_data"), Default::default())
// .map_err(|err| ExitCodes::DatabaseError(err.to_string()))?;
// let data_store = AssetDataStore::new(backend);
let asset_processor = ConcreteAssetProcessor::default();

let payload_processor = TariDanPayloadProcessor::new(asset_processor);
let mut inbound = TariCommsInboundConnectionService::new(asset_definition.public_key.clone());
let receiver = inbound.get_receiver();

let loopback = inbound.clone_sender();
let shutdown_2 = shutdown.clone();
task::spawn(async move {
let topic_subscription =
subscription_factory.get_subscription(TariMessageType::DanConsensusMessage, "HotStuffMessages");
inbound.run(shutdown_2, topic_subscription).await
});
let dht = handles.expect_handle::<Dht>();
let outbound =
TariCommsOutboundService::new(dht.outbound_requester(), loopback, asset_definition.public_key.clone());
let base_node_client = GrpcBaseNodeClient::new(config.base_node_grpc_address);
let chain_storage = SqliteStorageService {};
let wallet_client = GrpcWalletClient::new(config.wallet_grpc_address);
let checkpoint_manager = ConcreteCheckpointManager::new(asset_definition.clone(), wallet_client);
let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(dht.dht_requester());
let mut consensus_worker = ConsensusWorker::<DefaultServiceSpecification>::new(
receiver,
outbound,
committee_service,
node_identity.public_key().clone(),
payload_provider,
events_publisher,
signing_service,
payload_processor,
asset_definition,
base_node_client,
timeout,
db_factory,
chain_storage,
checkpoint_manager,
validator_node_client_factory,
);

if let Err(err) = consensus_worker.run(shutdown.clone(), None, kill).await {
error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err);
return Err(ExitError::new(ExitCode::UnknownError, err));
let mut wallet_client = GrpcWalletClient::new(self.config.wallet_grpc_address);

let outputs = base_node_client
.get_constitutions(self.identity.public_key().clone())
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?;

for output in outputs {
if let Some(sidechain_features) = output.features.sidechain_features {
let contract_id = sidechain_features.contract_id;
let constitution = sidechain_features.constitution.expect("Constitution wasn't present");

if constitution.acceptance_requirements.acceptance_period_expiry < last_tip {
let signature = Signature::default();

match wallet_client
.submit_contract_acceptance(&contract_id, self.identity.public_key(), &signature)
.await
{
Ok(tx_id) => info!("Accepted with id={}", tx_id),
Err(_) => error!("Did not accept the contract acceptance"),
};
};
}
}

Ok(())
}

// async fn start_asset_proxy(&self) -> Result<(), ExitCodes> {
// todo!()
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl BaseNodeClient for GrpcBaseNodeClient {
Ok(output)
}

async fn check_for_constitutions_for_me(
async fn get_constitutions(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, DigitalAssetError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<TServiceSpecification: ServiceSpecification> ValidatorNodeGrpcServer<TServi
impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_server::ValidatorNode
for ValidatorNodeGrpcServer<TServiceSpecification>
{
type GetCommitteeRequestsStream = mpsc::Receiver<Result<TransactionOutput, tonic::Status>>;
type GetConstitutionRequestsStream = mpsc::Receiver<Result<TransactionOutput, tonic::Status>>;

async fn publish_contract_acceptance(
&self,
Expand All @@ -96,10 +96,10 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
}
}

async fn get_committee_requests(
async fn get_constitution_requests(
&self,
_request: tonic::Request<rpc::GetCommitteeRequestsRequest>,
) -> Result<Response<Self::GetCommitteeRequestsStream>, tonic::Status> {
_request: tonic::Request<rpc::GetConstitutionRequestsRequest>,
) -> Result<Response<Self::GetConstitutionRequestsStream>, tonic::Status> {
let (mut _sender, receiver) = mpsc::channel(100);
task::spawn(async move {
let mut _test = 1u64;
Expand Down
Loading