Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect usage of tracing span in async functions. #10663

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 148 additions & 125 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,91 +937,101 @@ impl PeerActor {
msg_hash: CryptoHash,
body: RoutedMessageBody,
) -> Result<Option<RoutedMessageBody>, ReasonForBan> {
let _span = tracing::trace_span!(
target: "network",
"receive_routed_message",
"type" = <&RoutedMessageBody as Into<&'static str>>::into(&body)
)
.entered();
Ok(match body {
RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state
.client
.send_async(TxStatusRequest { tx_hash, signer_account_id: account_id })
.await
.ok()
.flatten()
.map(|response| RoutedMessageBody::TxStatusResponse(*response)),
RoutedMessageBody::TxStatusResponse(tx_result) => {
network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok();
None
}
RoutedMessageBody::StateResponse(info) => {
network_state
let body_type: &'static str = (&body).into();
let result = async {
Ok(match body {
RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state
.client
.send_async(StateResponse(StateResponseInfo::V1(info).into()))
.await
.ok();
None
}
RoutedMessageBody::BlockApproval(approval) => {
network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok();
None
}
RoutedMessageBody::ForwardTx(transaction) => {
network_state
.client
.send_async(ProcessTxRequest {
transaction,
is_forwarded: true,
check_only: false,
})
.send_async(TxStatusRequest { tx_hash, signer_account_id: account_id })
.await
.ok();
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back: msg_hash,
},
);
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(),
},
);
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg));
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok();
None
}
body => {
tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body);
None
}
})
.ok()
.flatten()
.map(|response| RoutedMessageBody::TxStatusResponse(*response)),
RoutedMessageBody::TxStatusResponse(tx_result) => {
network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok();
None
}
RoutedMessageBody::StateResponse(info) => {
network_state
.client
.send_async(StateResponse(StateResponseInfo::V1(info).into()))
.await
.ok();
None
}
RoutedMessageBody::BlockApproval(approval) => {
network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok();
None
}
RoutedMessageBody::ForwardTx(transaction) => {
network_state
.client
.send_async(ProcessTxRequest {
transaction,
is_forwarded: true,
check_only: false,
})
.await
.ok();
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back: msg_hash,
},
);
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(),
},
);
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg),
);
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
network_state
.client
.send_async(ChunkEndorsementMessage(endorsement))
.await
.ok();
None
}
body => {
tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body);
None
}
})
};
// DO NOT turn this into a scoped .entered() span, because that is incompatible
// with async fns.
result
.instrument(tracing::trace_span!(
target: "network",
"receive_routed_message",
"type" = body_type,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case #[instrument] application would need to use an initially empty field and then record it later as demonstrated in the last example here.

))
.await
}

fn receive_message(
Expand Down Expand Up @@ -1482,39 +1492,45 @@ impl PeerActor {
conn: Arc<connection::Connection>,
rtu: RoutingTableUpdate,
) {
let _span = tracing::trace_span!(target: "network", "handle_sync_routing_table").entered();
if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await {
conn.stop(Some(ban_reason));
}
let result = async {
if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await {
conn.stop(Some(ban_reason));
}

// Also pass the edges to the V2 routing table
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges))
.await
{
conn.stop(Some(ban_reason));
}
// Also pass the edges to the V2 routing table
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges))
.await
{
conn.stop(Some(ban_reason));
}

// For every announce we received, we fetch the last announce with the same account_id
// that we already broadcasted. Client actor will both verify signatures of the received announces
// as well as filter out those which are older than the fetched ones (to avoid overriding
// a newer announce with an older one).
let old = network_state
.account_announcements
.get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id));
let accounts: Vec<(AnnounceAccount, Option<EpochId>)> = rtu
.accounts
.into_iter()
.map(|aa| {
let id = aa.account_id.clone();
(aa, old.get(&id).map(|old| old.epoch_id.clone()))
})
.collect();
match network_state.client.send_async(AnnounceAccountRequest(accounts)).await {
Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)),
Ok(Ok(accounts)) => network_state.add_accounts(accounts).await,
Err(_) => {}
}
// For every announce we received, we fetch the last announce with the same account_id
// that we already broadcasted. Client actor will both verify signatures of the received announces
// as well as filter out those which are older than the fetched ones (to avoid overriding
// a newer announce with an older one).
let old = network_state
.account_announcements
.get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id));
let accounts: Vec<(AnnounceAccount, Option<EpochId>)> = rtu
.accounts
.into_iter()
.map(|aa| {
let id = aa.account_id.clone();
(aa, old.get(&id).map(|old| old.epoch_id.clone()))
})
.collect();
match network_state.client.send_async(AnnounceAccountRequest(accounts)).await {
Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)),
Ok(Ok(accounts)) => network_state.add_accounts(accounts).await,
Err(_) => {}
}
};
// DO NOT turn this into a scoped .entered() span, because that is incompatible

result
.instrument(tracing::trace_span!(target: "network", "handle_sync_routing_table"))
.await
}

async fn handle_distance_vector(
Expand All @@ -1523,19 +1539,26 @@ impl PeerActor {
conn: Arc<connection::Connection>,
distance_vector: DistanceVector,
) {
let _span = tracing::trace_span!(target: "network", "handle_distance_vector").entered();
let result = async {
if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}

if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}
if let Err(ban_reason) = network_state
.update_routes(
&clock,
NetworkTopologyChange::PeerAdvertisedDistances(distance_vector),
)
.await
{
conn.stop(Some(ban_reason));
}
};

if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::PeerAdvertisedDistances(distance_vector))
.await
{
conn.stop(Some(ban_reason));
}
// DO NOT turn this into a scoped .entered() span, because that is incompatible
// with async fns.
result.instrument(tracing::trace_span!(target: "network", "handle_distance_vector")).await
}
}

Expand Down
Loading