Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] decentralized state sync: p2p state part transfer #12095

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f114ea8
learn own public addr from peer infos and keep it in network_state
saketh-are Sep 1, 2024
61e101f
add Tier3Handshake PeerMessage variant
saketh-are Sep 8, 2024
0530d27
add Tier3 connection pool
saketh-are Sep 9, 2024
d7535b9
fix test connection_pool::invalid_edge
saketh-are Sep 9, 2024
a7f8b83
add new routed message StatePartRequest
saketh-are Sep 9, 2024
aad7348
enable snapshot generation by default
saketh-are Sep 10, 2024
3c7eb09
send state part request over new routed msg
saketh-are Sep 10, 2024
3bc6c0a
add tier3_requests queue
saketh-are Sep 11, 2024
bfd99f6
implement response via tier3
saketh-are Sep 11, 2024
436cd6a
fix allowed messages
saketh-are Sep 12, 2024
8d11063
bugfix tier3 init
saketh-are Sep 12, 2024
02a8bb2
successfully handle multiple requests from same peer
saketh-are Sep 13, 2024
016650d
use SnapshotHostsCache for peer selection
saketh-are Sep 14, 2024
c0fd295
include shard_id in prio and fix tests
saketh-are Sep 15, 2024
b3499e6
simplify peer selection and fix some bugs
saketh-are Sep 15, 2024
fa6c069
implement an idle timeout for tier3 connections
saketh-are Sep 16, 2024
17f93e8
Merge remote-tracking branch 'origin/master' into dss-wip
saketh-are Sep 16, 2024
9a267cf
clear peer selector when state part is received
saketh-are Sep 17, 2024
4134f2c
fix test
saketh-are Sep 17, 2024
a826bdb
try peers first before external storage
saketh-are Sep 17, 2024
30742f7
Merge remote-tracking branch 'origin/master' into dss-wip
saketh-are Sep 18, 2024
2e384ca
Merge remote-tracking branch 'origin/master' into dss-wip
saketh-are Sep 18, 2024
0ecf34a
get catching_up test compiling
saketh-are Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
411 changes: 120 additions & 291 deletions chain/client/src/sync/state.rs

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions chain/client/src/test_utils/peer_manager_mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use near_network::types::SetChainInfo;
use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse};
use near_network::types::{
PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent,
};

pub struct PeerManagerMock {
handle: Box<
Expand Down Expand Up @@ -37,3 +38,8 @@ impl actix::Handler<SetChainInfo> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: SetChainInfo, _ctx: &mut Self::Context) {}
}

impl actix::Handler<StateSyncEvent> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: StateSyncEvent, _ctx: &mut Self::Context) {}
}
11 changes: 7 additions & 4 deletions chain/client/src/tests/catching_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ enum ReceiptsSyncPhases {
pub struct StateRequestStruct {
pub shard_id: u64,
pub sync_hash: CryptoHash,
pub sync_prev_prev_hash: Option<CryptoHash>,
pub part_id: Option<u64>,
pub peer_id: PeerId,
pub peer_id: Option<PeerId>,
}

/// Sanity checks that the incoming and outgoing receipts are properly sent and received
Expand Down Expand Up @@ -268,8 +269,9 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool)
let srs = StateRequestStruct {
shard_id: *shard_id,
sync_hash: *sync_hash,
sync_prev_prev_hash: None,
part_id: None,
peer_id: peer_id.clone(),
peer_id: Some(peer_id.clone()),
};
if !seen_hashes_with_state
.contains(&hash_func(&borsh::to_vec(&srs).unwrap()))
Expand All @@ -283,16 +285,17 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool)
if let NetworkRequests::StateRequestPart {
shard_id,
sync_hash,
sync_prev_prev_hash,
part_id,
peer_id,
} = msg
{
if sync_hold {
let srs = StateRequestStruct {
shard_id: *shard_id,
sync_hash: *sync_hash,
sync_prev_prev_hash: Some(*sync_prev_prev_hash),
part_id: Some(*part_id),
peer_id: peer_id.clone(),
peer_id: None,
};
if !seen_hashes_with_state
.contains(&hash_func(&borsh::to_vec(&srs).unwrap()))
Expand Down
3 changes: 3 additions & 0 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
panic!("Tier1Handshake is not supported in Borsh encoding")
}
mem::PeerMessage::Tier2Handshake(h) => net::PeerMessage::Handshake((&h).into()),
mem::PeerMessage::Tier3Handshake(_) => {
panic!("Tier3Handshake is not supported in Borsh encoding")
}
mem::PeerMessage::HandshakeFailure(pi, hfr) => {
net::PeerMessage::HandshakeFailure(pi, (&hfr).into())
}
Expand Down
3 changes: 3 additions & 0 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ pub struct Disconnect {
pub enum PeerMessage {
Tier1Handshake(Handshake),
Tier2Handshake(Handshake),
Tier3Handshake(Handshake),
HandshakeFailure(PeerInfo, HandshakeFailureReason),
/// When a failed nonce is used by some peer, this message is sent back as evidence.
LastEdge(Edge),
Expand Down Expand Up @@ -552,6 +553,7 @@ pub enum RoutedMessageBody {
VersionedChunkEndorsement(ChunkEndorsement),
EpochSyncRequest,
EpochSyncResponse(CompressedEpochSyncProof),
StatePartRequest(StatePartRequest),
}

impl RoutedMessageBody {
Expand Down Expand Up @@ -645,6 +647,7 @@ impl fmt::Debug for RoutedMessageBody {
RoutedMessageBody::EpochSyncResponse(_) => {
write!(f, "EpochSyncResponse")
}
RoutedMessageBody::StatePartRequest(_) => write!(f, "StatePartRequest"),
}
}
}
Expand Down
16 changes: 7 additions & 9 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,17 +458,15 @@ message PeerMessage {
TraceContext trace_context = 26;

oneof message_type {
// Handshakes for TIER1 and TIER2 networks are considered separate,
// so that a node binary which doesn't support TIER1 connection won't
// be even able to PARSE the handshake. This way we avoid accidental
// connections, such that one end thinks it is a TIER2 connection and the
// other thinks it is a TIER1 connection. As currently both TIER1 and TIER2
// connections are handled by the same PeerActor, both fields use the same
// underlying message type. If we ever decide to separate the handshake
// implementations, we can copy the Handshake message type defition and
// make it evolve differently for TIER1 and TIER2.
// Handshakes for different network tiers explicitly use different PeerMessage variants.
// This way we avoid accidental connections, such that one end thinks it is a TIER2 connection
// and the other thinks it is a TIER1 connection. Currently the same PeerActor handles
// all types of connections, hence the contents are identical for all types of connections.
// If we ever decide to separate the handshake implementations, we can copy the Handshake message
// type definition and make it evolve differently for different tiers.
Handshake tier1_handshake = 27;
Handshake tier2_handshake = 4;
Handshake tier3_handshake = 33;

HandshakeFailure handshake_failure = 5;
LastEdge last_edge = 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl From<&PeerMessage> for proto::PeerMessage {
message_type: Some(match x {
PeerMessage::Tier1Handshake(h) => ProtoMT::Tier1Handshake(h.into()),
PeerMessage::Tier2Handshake(h) => ProtoMT::Tier2Handshake(h.into()),
PeerMessage::Tier3Handshake(h) => ProtoMT::Tier3Handshake(h.into()),
PeerMessage::HandshakeFailure(pi, hfr) => {
ProtoMT::HandshakeFailure((pi, hfr).into())
}
Expand Down Expand Up @@ -398,6 +399,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage {
ProtoMT::Tier2Handshake(h) => {
PeerMessage::Tier2Handshake(h.try_into().map_err(Self::Error::Handshake)?)
}
ProtoMT::Tier3Handshake(h) => {
PeerMessage::Tier3Handshake(h.try_into().map_err(Self::Error::Handshake)?)
}
ProtoMT::HandshakeFailure(hf) => {
let (pi, hfr) = hf.try_into().map_err(Self::Error::HandshakeFailure)?;
PeerMessage::HandshakeFailure(pi, hfr)
Expand Down
23 changes: 23 additions & 0 deletions chain/network/src/network_protocol/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,26 @@ pub enum SnapshotHostInfoVerificationError {
)]
TooManyShards(usize),
}

/// Message used to request a state part.
///
#[derive(
Clone,
Debug,
Eq,
PartialEq,
Hash,
borsh::BorshSerialize,
borsh::BorshDeserialize,
ProtocolSchema,
)]
pub struct StatePartRequest {
/// Requested shard id
pub shard_id: ShardId,
/// Hash of the requested snapshot's state root
pub sync_hash: CryptoHash,
/// Requested part id
pub part_id: u64,
/// Public address of the node making the request
pub addr: std::net::SocketAddr,
}
42 changes: 37 additions & 5 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,18 @@ impl PeerActor {
.start_outbound(peer_id.clone())
.map_err(ClosingReason::OutboundNotAllowed)?
}
tcp::Tier::T3 => {
// Loop connections are allowed only on T1; see comment above
if peer_id == &network_state.config.node_id() {
return Err(ClosingReason::OutboundNotAllowed(
connection::PoolError::UnexpectedLoopConnection,
));
}
network_state
.tier3
.start_outbound(peer_id.clone())
.map_err(ClosingReason::OutboundNotAllowed)?
}
},
handshake_spec: HandshakeSpec {
partial_edge_info: network_state.propose_edge(&clock, peer_id, None),
Expand All @@ -293,10 +305,12 @@ impl PeerActor {
},
},
};
// Override force_encoding for outbound Tier1 connections,
// since Tier1Handshake is supported only with proto encoding.
// Override force_encoding for outbound Tier1 and Tier3 connections;
// Tier1Handshake and Tier3Handshake are supported only with proto encoding.
let force_encoding = match &stream.type_ {
tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 => {
tcp::StreamType::Outbound { tier, .. }
if tier == &tcp::Tier::T1 || tier == &tcp::Tier::T3 =>
{
Some(Encoding::Proto)
}
_ => force_encoding,
Expand Down Expand Up @@ -480,6 +494,7 @@ impl PeerActor {
let msg = match spec.tier {
tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake),
tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake),
tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake),
};
self.send_message_or_log(&msg);
}
Expand Down Expand Up @@ -939,6 +954,9 @@ impl PeerActor {
(PeerStatus::Connecting { .. }, PeerMessage::Tier2Handshake(msg)) => {
self.process_handshake(ctx, tcp::Tier::T2, msg)
}
(PeerStatus::Connecting { .. }, PeerMessage::Tier3Handshake(msg)) => {
self.process_handshake(ctx, tcp::Tier::T3, msg)
}
(_, msg) => {
tracing::warn!(target:"network","unexpected message during handshake: {}",msg)
}
Expand Down Expand Up @@ -1140,7 +1158,9 @@ impl PeerActor {

self.stop(ctx, ClosingReason::DisconnectMessage);
}
PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) => {
PeerMessage::Tier1Handshake(_)
| PeerMessage::Tier2Handshake(_)
| PeerMessage::Tier3Handshake(_) => {
// Received handshake after already have seen handshake from this peer.
tracing::debug!(target: "network", "Duplicate handshake from {}", self.peer_info);
}
Expand Down Expand Up @@ -1182,8 +1202,20 @@ impl PeerActor {
self.stop(ctx, ClosingReason::Ban(ReasonForBan::Abusive));
}

// Add received peers to the peer store
let node_id = self.network_state.config.node_id();

// Record our own IP address as observed by the peer.
if self.network_state.my_public_addr.read().is_none() {
if let Some(my_peer_info) =
direct_peers.iter().find(|peer_info| peer_info.id == node_id)
{
if let Some(addr) = my_peer_info.addr {
let mut my_public_addr = self.network_state.my_public_addr.write();
*my_public_addr = Some(addr);
}
}
}
// Add received indirect peers to the peer store
self.network_state.peer_store.add_indirect_peers(
&self.clock,
peers.into_iter().filter(|peer_info| peer_info.id != node_id),
Expand Down
4 changes: 4 additions & 0 deletions chain/network/src/peer_manager/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ impl tcp::Tier {
match msg {
PeerMessage::Tier1Handshake(_) => self == tcp::Tier::T1,
PeerMessage::Tier2Handshake(_) => self == tcp::Tier::T2,
PeerMessage::Tier3Handshake(_) => self == tcp::Tier::T3,
PeerMessage::HandshakeFailure(_, _) => true,
PeerMessage::LastEdge(_) => true,
PeerMessage::VersionedStateResponse(_) => {
self == tcp::Tier::T2 || self == tcp::Tier::T3
}
PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body),
_ => self == tcp::Tier::T2,
}
Expand Down
Loading
Loading