Skip to content

Commit

Permalink
Merge pull request #1678 from iotaledger/core-node/epoch-change-relat…
Browse files Browse the repository at this point in the history
…ed-code

doc(node): Add doc for epoch change related code
  • Loading branch information
bingyanglin authored Aug 19, 2024
2 parents 9a386ef + 50fa235 commit ad6fe68
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 10 deletions.
4 changes: 4 additions & 0 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum NetworkType {
}

impl ConsensusAuthority {
/// Starts the `ConsensusAuthority` for the specified network type.
pub async fn start(
network_type: NetworkType,
own_index: AuthorityIndex,
Expand Down Expand Up @@ -143,6 +144,9 @@ impl<N> AuthorityNode<N>
where
N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
{
/// This function initializes and starts the consensus authority node
/// It ensures that the authority node is fully initialized and
/// ready to participate in the consensus process.
pub(crate) async fn start(
own_index: AuthorityIndex,
committee: Committee,
Expand Down
2 changes: 2 additions & 0 deletions consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) struct ChannelCoreThreadDispatcher {
}

impl ChannelCoreThreadDispatcher {
/// Starts the core thread for the consensus authority and returns a
/// dispatcher and handle for managing the core thread.
pub(crate) fn start(core: Core, context: Arc<Context>) -> (Self, CoreThreadHandle) {
let (sender, receiver) = metered_channel::channel_with_total(
CORE_THREAD_COMMANDS_CHANNEL_SIZE,
Expand Down
8 changes: 8 additions & 0 deletions consensus/core/src/leader_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub(crate) struct LeaderTimeoutTask<D: CoreThreadDispatcher> {
}

impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
/// Starts the leader timeout task, which monitors and manages the leader
/// election timeout mechanism.
pub fn start(
dispatcher: Arc<D>,
signals_receivers: &CoreSignalsReceivers,
Expand All @@ -58,6 +60,12 @@ impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
}
}

/// Runs the leader timeout task, managing the leader election timeout
/// mechanism in an asynchronous loop.
/// This mechanism ensures that if the current leader fails to produce a new
/// block within the specified timeout, the task forces the creation of a
/// new block, maintaining the continuity and robustness of the leader
/// election process.
async fn run(&mut self) {
let new_round = &mut self.new_round_receiver;
let mut leader_round: Round = *new_round.borrow_and_update();
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/network/anemo_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
self.client.clone()
}

/// Installs and starts the consensus service on the specified network.
async fn install_service(&mut self, network_keypair: NetworkKeyPair, service: Arc<S>) {
self.context
.metrics
Expand Down
2 changes: 2 additions & 0 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThread
}

impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
/// Starts the synchronizer, which is responsible for fetching blocks from
/// other authorities and managing block synchronization tasks.
pub fn start(
network_client: Arc<C>,
context: Arc<Context>,
Expand Down
12 changes: 11 additions & 1 deletion crates/iota-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,15 @@ pub struct ExecutionComponents {
pub(crate) module_cache: Arc<SyncModuleCache<ResolverWrapper<ExecutionCache>>>,
metrics: Arc<ResolverMetrics>,
}

/// The `AuthorityPerEpochStore` struct manages state and resources specific to
/// each epoch within a validator's lifecycle. It includes the validator's name,
/// the committee for the current epoch, and various in-memory caches and
/// notification mechanisms to track the state of transactions and certificates.
/// The struct also incorporates mechanisms for managing signature verification,
/// epoch transitions, and randomness generation. Additionally, it maintains
/// locks and barriers to ensure that tasks related to reconfiguration and epoch
/// transitions are handled correctly and without interference. This struct is
/// designed to manage tasks and data that are valid only within a single epoch.
pub struct AuthorityPerEpochStore {
/// The name of this authority.
pub(crate) name: AuthorityName,
Expand Down Expand Up @@ -1786,6 +1794,8 @@ impl AuthorityPerEpochStore {
.multi_contains_keys(keys)?)
}

/// Notifies the epoch store that the specified consensus messages have been
/// processed.
pub async fn consensus_messages_processed_notify(
&self,
keys: Vec<SequencedConsensusTransactionKey>,
Expand Down
15 changes: 8 additions & 7 deletions crates/iota-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ impl AuthorityStoreMetrics {
}
}

/// ALL_OBJ_VER determines whether we want to store all past
/// versions of every object in the store. Authority doesn't store
/// them, but other entities such as replicas will.
/// S is a template on Authority signature state. This allows IotaDataStore to
/// be used on either authorities or non-authorities. Specifically, when storing
/// transactions and effects, S allows IotaDataStore to either store the
/// authority signed version or unsigned version.
/// The `AuthorityStore` manages the state and operations of an authority's
/// store. It includes a `mutex_table` to handle concurrent writes to the
/// database and references to various tables stored in
/// `AuthorityPerpetualTables`. The struct provides mechanisms for initializing
/// and accessing locks, managing objects and transactions, and performing
/// epoch-specific operations. It also includes methods for recovering from
/// crashes, checking IOTA conservation, and handling object markers and states
/// during epoch transitions.
pub struct AuthorityStore {
/// Internal vector of locks to manage concurrent writes to the database
mutex_table: MutexTable<ObjectDigest>,
Expand Down
2 changes: 2 additions & 0 deletions crates/iota-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ impl AuthorityStorePruner {
pruned_checkpoint + delta
}

/// Sets up the pruning service for the authority store, configuring
/// intervals and conditions for object and checkpoint pruning.
fn setup_pruning(
config: AuthorityStorePruningConfig,
epoch_duration_ms: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum EpochStartConfiguration {
}

impl EpochStartConfiguration {
/// Constructs a new `EpochStartConfigurationV5` for the given epoch.
pub fn new(
system_state: EpochStartSystemState,
epoch_digest: CheckpointDigest,
Expand Down
16 changes: 16 additions & 0 deletions crates/iota-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,8 @@ impl CheckpointBuilder {
}
}

/// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
/// creation of checkpoints.
async fn run(mut self) {
info!("Starting CheckpointBuilder");
'main: loop {
Expand Down Expand Up @@ -865,6 +867,8 @@ impl CheckpointBuilder {
info!("Shutting down CheckpointBuilder");
}

/// Creates a checkpoint at the specified height using the provided pending
/// checkpoint data.
#[instrument(level = "debug", skip_all, fields(?height))]
async fn make_checkpoint(
&self,
Expand Down Expand Up @@ -898,6 +902,7 @@ impl CheckpointBuilder {
Ok(())
}

/// Writes the new checkpoints to the DB storage and processes them.
#[instrument(level = "debug", skip_all)]
async fn write_checkpoints(
&self,
Expand Down Expand Up @@ -1010,6 +1015,8 @@ impl CheckpointBuilder {
Ok(chunks)
}

/// Creates checkpoints using the provided transaction effects and pending
/// checkpoint information.
#[instrument(level = "debug", skip_all)]
async fn create_checkpoints(
&self,
Expand Down Expand Up @@ -1261,6 +1268,8 @@ impl CheckpointBuilder {
}
}

/// Augments the last checkpoint of the epoch by creating and executing an
/// advance epoch transaction.
#[instrument(level = "error", skip_all)]
async fn augment_epoch_last_checkpoint(
&self,
Expand Down Expand Up @@ -1380,6 +1389,11 @@ impl CheckpointAggregator {
}
}

/// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
/// aggregation of checkpoints.
/// The function ensures continuous aggregation of checkpoints, handling
/// errors and retries gracefully, and allowing for proper shutdown on
/// receiving an exit signal.
async fn run(mut self) {
info!("Starting CheckpointAggregator");
loop {
Expand Down Expand Up @@ -1818,6 +1832,8 @@ pub struct CheckpointService {
}

impl CheckpointService {
/// Spawns the checkpoint service, initializing and starting the checkpoint
/// builder and aggregator tasks.
pub fn spawn(
state: Arc<AuthorityState>,
checkpoint_store: Arc<CheckpointStore>,
Expand Down
2 changes: 2 additions & 0 deletions crates/iota-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ impl MysticetiManager {
}

#[async_trait]

impl ConsensusManagerTrait for MysticetiManager {
/// Starts the Mysticeti consensus manager for the current epoch.
async fn start(
&self,
_config: &NodeConfig,
Expand Down
37 changes: 35 additions & 2 deletions crates/iota-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ impl IotaNode {
Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
}

/// Starts the JWK (JSON Web Key) updater tasks for the specified node
/// configuration.
/// This function ensures continuous fetching, validation, and submission of
/// JWKs, maintaining up-to-date keys for the specified providers.
fn start_jwk_updater(
config: &NodeConfig,
metrics: Arc<IotaNodeMetrics>,
Expand Down Expand Up @@ -1059,6 +1063,8 @@ impl IotaNode {
))
}

/// Asynchronously constructs and initializes the components necessary for
/// the validator node.
async fn construct_validator_components(
config: NodeConfig,
state: Arc<AuthorityState>,
Expand Down Expand Up @@ -1202,6 +1208,8 @@ impl IotaNode {
.await
}

/// Initializes and starts components specific to the current
/// epoch for the validator node.
async fn start_epoch_specific_validator_components(
config: &NodeConfig,
state: Arc<AuthorityState>,
Expand Down Expand Up @@ -1310,6 +1318,12 @@ impl IotaNode {
})
}

/// Starts the checkpoint service for the validator node, initializing
/// necessary components and settings.
/// The function ensures proper initialization of the checkpoint service,
/// preparing it to handle checkpoint creation and submission to consensus,
/// while also setting up the necessary monitoring and synchronization
/// mechanisms.
fn start_checkpoint_service(
config: &NodeConfig,
consensus_adapter: Arc<ConsensusAdapter>,
Expand Down Expand Up @@ -1469,8 +1483,9 @@ impl IotaNode {
}

/// This function awaits the completion of checkpoint execution of the
/// current epoch, after which it iniitiates reconfiguration of the
/// entire system.
/// current epoch, after which it initiates reconfiguration of the
/// entire system. This function also handles role changes for the node when
/// epoch changes.
pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
let mut checkpoint_executor = CheckpointExecutor::new(
self.state_sync_handle.subscribe_to_synced_checkpoints(),
Expand Down Expand Up @@ -1704,6 +1719,8 @@ impl IotaNode {
}
}

/// Asynchronously reconfigures the state of the authority node for the next
/// epoch.
async fn reconfigure_state(
&self,
state: &Arc<AuthorityState>,
Expand Down Expand Up @@ -1884,6 +1901,22 @@ fn build_kv_store(
)))
}

/// Builds and starts the HTTP server for the Iota node, exposing JSON-RPC and
/// REST APIs based on the node's configuration.
///
/// This function performs the following tasks:
/// 1. Checks if the node is a validator by inspecting the consensus
/// configuration; if so, it returns early as validators do not expose these
/// APIs.
/// 2. Creates an Axum router to handle HTTP requests.
/// 3. Initializes the JSON-RPC server and registers various RPC modules based
/// on the node's state and configuration, including CoinApi,
/// TransactionBuilderApi, GovernanceApi, TransactionExecutionApi, and
/// IndexerApi.
/// 4. Optionally, if the REST API is enabled, nests the REST API router under
/// the `/rest` path.
/// 5. Binds the server to the specified JSON-RPC address and starts listening
/// for incoming connections.
pub async fn build_http_server(
state: Arc<AuthorityState>,
store: RocksDbStore,
Expand Down

0 comments on commit ad6fe68

Please sign in to comment.