Skip to content

Commit

Permalink
[dag] add a few structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Oct 3, 2023
1 parent aad2835 commit 89a3dd3
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 19 deletions.
12 changes: 8 additions & 4 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
dag::{
adapter::{compute_initial_block_and_ledger_info, LedgerInfoProvider},
dag_state_sync::StateSyncStatus,
observability::logging::{LogEvent, LogSchema},
},
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
Expand Down Expand Up @@ -208,9 +209,8 @@ impl DagBootstrapper {
let (parent_block_info, ledger_info) =
compute_initial_block_and_ledger_info(ledger_info_from_storage);
debug!(
"Starting DAG instance for epoch {} round {}",
self.epoch_state.epoch,
ledger_info.commit_info().round(),
LogSchema::new(LogEvent::Start).round(ledger_info.commit_info().round()),
epoch = self.epoch_state.epoch,
);

let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));
Expand Down Expand Up @@ -266,7 +266,11 @@ impl DagBootstrapper {
match sync_status {
StateSyncStatus::NeedsSync(certified_node_msg) => {
let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round();
debug!("state sync notification received for round {}, dag round {}, ordered round {:?} commit round {} ", certified_node_msg.round(), dag_store.read().highest_round(), dag_store.read().highest_ordered_anchor_round(), highest_committed_anchor_round);
debug!(LogSchema::new(LogEvent::StateSync).round(dag_store.read().highest_round()),
target_round = certified_node_msg.round(),
local_ordered_round = dag_store.read().highest_ordered_anchor_round(),
local_committed_round = highest_committed_anchor_round
);
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

let sync_future = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round);
Expand Down
22 changes: 16 additions & 6 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
dag_store::Dag,
observability::{
counters,
logging::{LogEvent, LogSchema},
tracing::{observe_node, observe_round, NodeStage, RoundStage},
},
types::{CertificateAckState, CertifiedNode, Node, SignatureBuilder},
Expand Down Expand Up @@ -85,11 +86,6 @@ impl DagDriver {
.get_strong_links_for_round(highest_round, &epoch_state.verifier)
.map_or_else(|| highest_round.saturating_sub(1), |_| highest_round);

debug!(
"highest_round: {}, current_round: {}",
highest_round, highest_strong_links_round
);

let mut driver = Self {
author,
epoch_state,
Expand All @@ -110,6 +106,10 @@ impl DagDriver {
if let Some(node) =
pending_node.filter(|node| node.round() == highest_strong_links_round + 1)
{
debug!(
LogSchema::new(LogEvent::NewRound).round(node.round()),
"Resume round"
);
driver.current_round = node.round();
driver.broadcast_node(node);
} else {
Expand Down Expand Up @@ -147,7 +147,7 @@ impl DagDriver {
}

pub async fn enter_new_round(&mut self, new_round: Round) {
debug!("entering new round {}", new_round);
debug!(LogSchema::new(LogEvent::NewRound).round(new_round));
counters::CURRENT_ROUND.set(new_round as i64);
let strong_links = self
.dag
Expand Down Expand Up @@ -236,10 +236,17 @@ impl DagDriver {
let node_clone = node.clone();
let timestamp = node.timestamp();
let node_broadcast = async move {
debug!(LogSchema::new(LogEvent::BroadcastNode), id = node.id());

defer!( observe_round(timestamp, RoundStage::NodeBroadcasted); );
rb.broadcast(node, signature_builder).await
};
let core_task = node_broadcast.then(move |certificate| {
debug!(
LogSchema::new(LogEvent::BroadcastCertifiedNode),
id = node_clone.id()
);

defer!( observe_round(timestamp, RoundStage::CertifiedNodeBroadcasted); );
let certified_node =
CertifiedNode::new(node_clone, certificate.signatures().to_owned());
Expand Down Expand Up @@ -271,6 +278,9 @@ impl RpcHandler for DagDriver {

async fn process(&mut self, certified_node: Self::Request) -> anyhow::Result<Self::Response> {
let epoch = certified_node.metadata().epoch();
debug!(LogSchema::new(LogEvent::ReceiveCertifiedNode)
.remote_peer(*certified_node.author())
.round(certified_node.round()));
{
let dag_reader = self.dag.read();
if dag_reader.exists(certified_node.metadata()) {
Expand Down
14 changes: 13 additions & 1 deletion consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{dag_network::RpcWithFallback, types::NodeMetadata, RpcHandler};
use crate::dag::{
dag_network::TDAGNetworkSender,
dag_store::Dag,
observability::logging::{LogEvent, LogSchema},
types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest},
};
use anyhow::{anyhow, ensure};
Expand Down Expand Up @@ -253,7 +254,12 @@ impl TDagFetcher for DagFetcher {
responders: Vec<Author>,
dag: Arc<RwLock<Dag>>,
) -> anyhow::Result<()> {
debug!("Start fetch request: {:?}", remote_request);
debug!(
LogSchema::new(LogEvent::FetchNodes),
start_round = remote_request.start_round(),
target_round = remote_request.target_round(),
missing_nodes = remote_request.exists_bitmask().num_missing(),
);
let mut rpc = RpcWithFallback::new(
responders,
remote_request.clone().into(),
Expand Down Expand Up @@ -328,6 +334,12 @@ impl RpcHandler for FetchRequestHandler {
FetchRequestHandleError::TargetsMissing
);

debug!(
LogSchema::new(LogEvent::ReceiveFetchNodes).round(dag_reader.highest_round()),
start_round = message.start_round(),
target_round = message.target_round(),
);

let certified_nodes: Vec<_> = dag_reader
.reachable(
message.targets(),
Expand Down
1 change: 0 additions & 1 deletion consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ impl DagStateSynchronizer {
},
}

// State sync
self.state_computer.sync_to(commit_li.clone()).await?;

Ok(Arc::into_inner(sync_dag_store).map(|r| r.into_inner()))
Expand Down
40 changes: 40 additions & 0 deletions consensus/src/dag/observability/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_consensus_types::common::{Author, Round};
use aptos_logger::Schema;
use serde::Serialize;

#[derive(Schema)]
pub struct LogSchema {
event: LogEvent,
remote_peer: Option<Author>,
round: Option<Round>,
}

#[derive(Serialize)]
pub enum LogEvent {
Start,
BroadcastNode,
ReceiveNode,
Vote,
ReceiveVote,
BroadcastCertifiedNode,
ReceiveCertifiedNode,
ReceiveAck,
OrderedAnchor,
NewRound,
FetchNodes,
ReceiveFetchNodes,
StateSync,
}

impl LogSchema {
pub fn new(event: LogEvent) -> Self {
Self {
event,
remote_peer: None,
round: None,
}
}
}
1 change: 1 addition & 0 deletions consensus/src/dag/observability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

pub mod counters;
pub mod logging;
pub mod tracing;
10 changes: 7 additions & 3 deletions consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use crate::dag::{
anchor_election::AnchorElection,
dag_state_sync::DAG_WINDOW,
dag_store::{Dag, NodeStatus},
observability::tracing::{observe_node, NodeStage},
observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
},
storage::DAGStorage,
types::NodeMetadata,
CertifiedNode,
Expand Down Expand Up @@ -202,8 +205,9 @@ impl OrderRule {
ordered_nodes.reverse();

debug!(
"Ordered anchor {}, reached round {} with {} nodes",
anchor.id(),
LogSchema::new(LogEvent::OrderedAnchor),
id = anchor.id(),
"Reached round {} with {} nodes",
lowest_round_to_reach,
ordered_nodes.len()
);
Expand Down
15 changes: 12 additions & 3 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId};
use crate::dag::{
dag_network::RpcHandler,
dag_store::Dag,
observability::tracing::{observe_node, NodeStage},
observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
},
types::{Node, NodeCertificate, Vote},
};
use anyhow::{bail, ensure};
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_logger::{debug, error};
use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
use async_trait::async_trait;
use std::{collections::BTreeMap, mem, sync::Arc};
Expand Down Expand Up @@ -150,6 +153,9 @@ impl RpcHandler for NodeBroadcastHandler {
async fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> {
let node = self.validate(node)?;
observe_node(node.timestamp(), NodeStage::NodeReceived);
debug!(LogSchema::new(LogEvent::ReceiveNode)
.remote_peer(*node.author())
.round(node.round()));

let votes_by_peer = self
.votes_by_round_peer
Expand All @@ -161,8 +167,11 @@ impl RpcHandler for NodeBroadcastHandler {
let vote = Vote::new(node.metadata().clone(), signature);

self.storage.save_vote(&node.id(), &vote)?;
votes_by_peer.insert(*node.metadata().author(), vote.clone());
votes_by_peer.insert(*node.author(), vote.clone());

debug!(LogSchema::new(LogEvent::Vote)
.remote_peer(*node.author())
.round(node.round()));
Ok(vote)
},
Some(ack) => Ok(ack.clone()),
Expand Down
25 changes: 24 additions & 1 deletion consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
dag::observability::tracing::{observe_node, NodeStage},
dag::observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
},
network::TConsensusMsg,
network_interface::ConsensusMsg,
};
Expand All @@ -16,6 +19,7 @@ use aptos_crypto::{
};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
use aptos_enum_conversion_derive::EnumConversion;
use aptos_logger::debug;
use aptos_reliable_broadcast::{BroadcastStatus, RBMessage};
use aptos_types::{
aggregate_signature::{AggregateSignature, PartialSignatures},
Expand Down Expand Up @@ -520,6 +524,9 @@ impl BroadcastStatus<DAGMessage> for SignatureBuilder {

fn add(&mut self, peer: Author, ack: Self::Ack) -> anyhow::Result<Option<Self::Aggregated>> {
ensure!(self.metadata == ack.metadata, "Digest mismatch");
debug!(LogSchema::new(LogEvent::ReceiveVote)
.remote_peer(peer)
.round(self.metadata.round()));
self.partial_signatures.add_signature(peer, ack.signature);
Ok(self
.epoch_state
Expand Down Expand Up @@ -569,6 +576,7 @@ impl BroadcastStatus<DAGMessage> for CertificateAckState {
type Message = CertifiedNodeMessage;

fn add(&mut self, peer: Author, _ack: Self::Ack) -> anyhow::Result<Option<Self::Aggregated>> {
debug!(LogSchema::new(LogEvent::ReceiveAck).remote_peer(peer));
self.received.insert(peer);
if self.received.len() == self.num_validators {
Ok(Some(()))
Expand Down Expand Up @@ -608,6 +616,14 @@ impl RemoteFetchRequest {
pub fn exists_bitmask(&self) -> &DagSnapshotBitmask {
&self.exists_bitmask
}

pub fn start_round(&self) -> Round {
self.exists_bitmask.first_round()
}

pub fn target_round(&self) -> Round {
self.targets[0].round
}
}

impl TDAGMessage for RemoteFetchRequest {
Expand Down Expand Up @@ -805,6 +821,13 @@ impl DagSnapshotBitmask {
.unwrap_or(false)
}

pub fn num_missing(&self) -> usize {
self.bitmask
.iter()
.map(|round| round.iter().map(|exist| !*exist as usize).sum::<usize>())
.sum::<usize>()
}

pub fn first_round(&self) -> Round {
self.first_round
}
Expand Down

0 comments on commit 89a3dd3

Please sign in to comment.