Skip to content

Commit

Permalink
Keep track of checkpoint_number in consensus worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 7, 2022
1 parent 60c7afa commit 09ab905
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use std::net::SocketAddr;

use async_trait::async_trait;
use log::*;
use tari_app_grpc::{
tari_rpc as grpc,
tari_rpc::{
Expand All @@ -39,7 +38,7 @@ use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{services::WalletClient, DigitalAssetError};
use tari_dan_engine::state::models::StateRoot;

const LOG_TARGET: &str = "tari::dan::wallet_grpc";
const _LOG_TARGET: &str = "tari::dan::wallet_grpc";

type Inner = grpc::wallet_client::WalletClient<tonic::transport::Channel>;

Expand Down Expand Up @@ -80,8 +79,6 @@ impl WalletClient for GrpcWalletClient {
signatures: checkpoint_signatures.into_iter().map(Into::into).collect(),
};

info!(target: LOG_TARGET, "✅ Creating checkpoint #{}", checkpoint_number);

if checkpoint_number == 0 {
let request = CreateInitialAssetCheckpointRequest {
contract_id: contract_id.to_vec(),
Expand Down
36 changes: 15 additions & 21 deletions dan_layer/core/src/services/checkpoint_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const LOG_TARGET: &str = "tari::dan::checkpoint_manager";
pub trait CheckpointManager {
async fn create_checkpoint(
&mut self,
checkpoint_number: u64,
state_root: StateRoot,
signature: Vec<SignerSignature>,
) -> Result<(), DigitalAssetError>;
Expand All @@ -42,17 +43,13 @@ pub trait CheckpointManager {
pub struct ConcreteCheckpointManager<TWallet: WalletClient> {
asset_definition: AssetDefinition,
wallet: TWallet,
num_calls: u32,
checkpoint_interval: u32,
}

impl<TWallet: WalletClient> ConcreteCheckpointManager<TWallet> {
pub fn new(asset_definition: AssetDefinition, wallet: TWallet) -> Self {
Self {
asset_definition,
wallet,
num_calls: 0,
checkpoint_interval: 100,
}
}
}
Expand All @@ -61,26 +58,23 @@ impl<TWallet: WalletClient> ConcreteCheckpointManager<TWallet> {
impl<TWallet: WalletClient + Sync + Send> CheckpointManager for ConcreteCheckpointManager<TWallet> {
async fn create_checkpoint(
&mut self,
checkpoint_number: u64,
state_root: StateRoot,
signatures: Vec<SignerSignature>,
) -> Result<(), DigitalAssetError> {
if self.num_calls == 0 || self.num_calls >= self.checkpoint_interval {
// TODO: fetch and increment checkpoint number
let checkpoint_number = u64::from(self.num_calls / self.checkpoint_interval);
info!(
target: LOG_TARGET,
"Creating checkpoint for contract {}", self.asset_definition.contract_id
);
self.wallet
.create_new_checkpoint(
&self.asset_definition.contract_id,
&state_root,
checkpoint_number,
signatures,
)
.await?;
}
self.num_calls += 1;
info!(
target: LOG_TARGET,
"✅ Creating checkpoint #{} for contract {}", checkpoint_number, self.asset_definition.contract_id
);

self.wallet
.create_new_checkpoint(
&self.asset_definition.contract_id,
&state_root,
checkpoint_number,
signatures,
)
.await?;
Ok(())
}
}
10 changes: 8 additions & 2 deletions dan_layer/core/src/services/service_specification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ use crate::{
SigningService,
ValidatorNodeClientFactory,
},
storage::{chain::ChainDbBackendAdapter, global::GlobalDbBackendAdapter, ChainStorageService, DbFactory},
storage::{
chain::{ChainDbBackendAdapter, ChainDbMetadataKey},
global::GlobalDbBackendAdapter,
ChainStorageService,
DbFactory,
MetadataBackendAdapter,
},
};

/// A trait to describe a specific configuration of services. This type allows other services to
Expand All @@ -50,7 +56,7 @@ pub trait ServiceSpecification: Default + Clone {
type AssetProcessor: AssetProcessor + Clone;
type AssetProxy: AssetProxy + Clone;
type BaseNodeClient: BaseNodeClient + Clone;
type ChainDbBackendAdapter: ChainDbBackendAdapter;
type ChainDbBackendAdapter: ChainDbBackendAdapter + MetadataBackendAdapter<ChainDbMetadataKey>;
type ChainStorageService: ChainStorageService<Self::Payload>;
type CheckpointManager: CheckpointManager;
type CommitteeManager: CommitteeManager<Self::Addr>;
Expand Down
15 changes: 11 additions & 4 deletions dan_layer/core/src/workers/consensus_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ impl<'a, T: ServiceSpecification<Addr = PublicKey>> ConsensusWorkerProcessor<'a,
}

async fn commit(&mut self) -> Result<ConsensusWorkerStateEvent, DigitalAssetError> {
let current_checkpoint_num = self.chain_db.get_current_checkpoint_number()?;
let mut unit_of_work = self.chain_db.new_unit_of_work();
let mut state = states::CommitState::<T>::new(
self.worker.node_address.clone(),
Expand All @@ -285,6 +286,7 @@ impl<'a, T: ServiceSpecification<Addr = PublicKey>> ConsensusWorkerProcessor<'a,
&mut self.worker.outbound_service,
&self.worker.signing_service,
unit_of_work.clone(),
current_checkpoint_num,
)
.await?;
unit_of_work.commit()?;
Expand Down Expand Up @@ -313,10 +315,15 @@ impl<'a, T: ServiceSpecification<Addr = PublicKey>> ConsensusWorkerProcessor<'a,
if let Some(mut state_tx) = self.worker.state_db_unit_of_work.take() {
state_tx.commit()?;
let signatures = state.collected_checkpoint_signatures();
self.worker
.checkpoint_manager
.create_checkpoint(state_tx.calculate_root()?, signatures)
.await?;
// TODO: Read checkpoint interval from constitution
if self.worker.current_view_id.as_u64() % 50 == 0 {
let checkpoint_number = self.chain_db.get_current_checkpoint_number()?;
self.worker
.checkpoint_manager
.create_checkpoint(checkpoint_number, state_tx.calculate_root()?, signatures)
.await?;
self.chain_db.increment_checkpoint_number()?;
}
Ok(res)
} else {
// technically impossible
Expand Down

0 comments on commit 09ab905

Please sign in to comment.