From e6d4c87300afd8d28a2d356890fbb2978432fe8f Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Wed, 6 Jul 2022 12:25:11 +0200 Subject: [PATCH] Keep track of checkpoint_number in consensus worker --- .../src/grpc/services/wallet_client.rs | 5 +-- .../core/src/services/checkpoint_manager.rs | 36 ++++++++----------- .../src/services/service_specification.rs | 10 ++++-- .../core/src/workers/consensus_worker.rs | 15 +++++--- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/applications/tari_validator_node/src/grpc/services/wallet_client.rs b/applications/tari_validator_node/src/grpc/services/wallet_client.rs index 4dd1c45710..8c8d6feb68 100644 --- a/applications/tari_validator_node/src/grpc/services/wallet_client.rs +++ b/applications/tari_validator_node/src/grpc/services/wallet_client.rs @@ -23,7 +23,6 @@ use std::net::SocketAddr; use async_trait::async_trait; -use log::*; use tari_app_grpc::{ tari_rpc as grpc, tari_rpc::{ @@ -39,7 +38,7 @@ use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{services::WalletClient, DigitalAssetError}; use tari_dan_engine::state::models::StateRoot; -const LOG_TARGET: &str = "tari::dan::wallet_grpc"; +const _LOG_TARGET: &str = "tari::dan::wallet_grpc"; type Inner = grpc::wallet_client::WalletClient; @@ -80,8 +79,6 @@ impl WalletClient for GrpcWalletClient { signatures: checkpoint_signatures.into_iter().map(Into::into).collect(), }; - info!(target: LOG_TARGET, "✅ Creating checkpoint #{}", checkpoint_number); - if checkpoint_number == 0 { let request = CreateInitialAssetCheckpointRequest { contract_id: contract_id.to_vec(), diff --git a/dan_layer/core/src/services/checkpoint_manager.rs b/dan_layer/core/src/services/checkpoint_manager.rs index d60a025d14..e4f1c308ae 100644 --- a/dan_layer/core/src/services/checkpoint_manager.rs +++ b/dan_layer/core/src/services/checkpoint_manager.rs @@ -33,6 +33,7 @@ const LOG_TARGET: &str = "tari::dan::checkpoint_manager"; pub trait CheckpointManager { async fn create_checkpoint( &mut self, + checkpoint_number: u64, state_root: StateRoot, signature: Vec, ) -> Result<(), DigitalAssetError>; @@ -42,8 +43,6 @@ pub trait CheckpointManager { pub struct ConcreteCheckpointManager { asset_definition: AssetDefinition, wallet: TWallet, - num_calls: u32, - checkpoint_interval: u32, } impl ConcreteCheckpointManager { @@ -51,8 +50,6 @@ impl ConcreteCheckpointManager { Self { asset_definition, wallet, - num_calls: 0, - checkpoint_interval: 100, } } } @@ -61,26 +58,23 @@ impl ConcreteCheckpointManager { impl CheckpointManager for ConcreteCheckpointManager { async fn create_checkpoint( &mut self, + checkpoint_number: u64, state_root: StateRoot, signatures: Vec, ) -> Result<(), DigitalAssetError> { - if self.num_calls == 0 || self.num_calls >= self.checkpoint_interval { - // TODO: fetch and increment checkpoint number - let checkpoint_number = u64::from(self.num_calls / self.checkpoint_interval); - info!( - target: LOG_TARGET, - "Creating checkpoint for contract {}", self.asset_definition.contract_id - ); - self.wallet - .create_new_checkpoint( - &self.asset_definition.contract_id, - &state_root, - checkpoint_number, - signatures, - ) - .await?; - } - self.num_calls += 1; + info!( + target: LOG_TARGET, + "✅ Creating checkpoint #{} for contract {}", checkpoint_number, self.asset_definition.contract_id + ); + + self.wallet + .create_new_checkpoint( + &self.asset_definition.contract_id, + &state_root, + checkpoint_number, + signatures, + ) + .await?; Ok(()) } } diff --git a/dan_layer/core/src/services/service_specification.rs b/dan_layer/core/src/services/service_specification.rs index 5c073ebbc8..7b6586769e 100644 --- a/dan_layer/core/src/services/service_specification.rs +++ b/dan_layer/core/src/services/service_specification.rs @@ -39,7 +39,13 @@ use crate::{ SigningService, ValidatorNodeClientFactory, }, - storage::{chain::ChainDbBackendAdapter, global::GlobalDbBackendAdapter, ChainStorageService, DbFactory}, + storage::{ + chain::{ChainDbBackendAdapter, ChainDbMetadataKey}, + global::GlobalDbBackendAdapter, + ChainStorageService, + DbFactory, + MetadataBackendAdapter, + }, }; /// A trait to describe a specific configuration of services. This type allows other services to @@ -50,7 +56,7 @@ pub trait ServiceSpecification: Default + Clone { type AssetProcessor: AssetProcessor + Clone; type AssetProxy: AssetProxy + Clone; type BaseNodeClient: BaseNodeClient + Clone; - type ChainDbBackendAdapter: ChainDbBackendAdapter; + type ChainDbBackendAdapter: ChainDbBackendAdapter + MetadataBackendAdapter; type ChainStorageService: ChainStorageService; type CheckpointManager: CheckpointManager; type CommitteeManager: CommitteeManager; diff --git a/dan_layer/core/src/workers/consensus_worker.rs b/dan_layer/core/src/workers/consensus_worker.rs index 61b25de7c6..ef016ec49c 100644 --- a/dan_layer/core/src/workers/consensus_worker.rs +++ b/dan_layer/core/src/workers/consensus_worker.rs @@ -271,6 +271,7 @@ impl<'a, T: ServiceSpecification> ConsensusWorkerProcessor<'a, } async fn commit(&mut self) -> Result { + let current_checkpoint_num = self.chain_db.get_current_checkpoint_number()?; let mut unit_of_work = self.chain_db.new_unit_of_work(); let mut state = states::CommitState::::new( self.worker.node_address.clone(), @@ -285,6 +286,7 @@ impl<'a, T: ServiceSpecification> ConsensusWorkerProcessor<'a, &mut self.worker.outbound_service, &self.worker.signing_service, unit_of_work.clone(), + current_checkpoint_num, ) .await?; unit_of_work.commit()?; @@ -313,10 +315,15 @@ impl<'a, T: ServiceSpecification> ConsensusWorkerProcessor<'a, if let Some(mut state_tx) = self.worker.state_db_unit_of_work.take() { state_tx.commit()?; let signatures = state.collected_checkpoint_signatures(); - self.worker - .checkpoint_manager - .create_checkpoint(state_tx.calculate_root()?, signatures) - .await?; + // TODO: Read checkpoint interval from constitution + if self.worker.current_view_id.as_u64() % 10 == 0 { + let checkpoint_number = self.chain_db.get_current_checkpoint_number()?; + self.worker + .checkpoint_manager + .create_checkpoint(checkpoint_number, state_tx.calculate_root()?, signatures) + .await?; + self.chain_db.increment_checkpoint_number()?; + } Ok(res) } else { // technically impossible