Skip to content

Commit

Permalink
Adds eicIsLocalRootPeer to ExpandedInitiatorContext
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Mar 19, 2024
1 parent 838df55 commit 0bd3e0d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 34 deletions.
3 changes: 2 additions & 1 deletion ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ bidirectionalExperiment
eicControlMessage = readTVar
. projectBundle tok
$ controlMessageBundle,
eicIsBigLedgerPeer = IsNotBigLedgerPeer
eicIsBigLedgerPeer = IsNotBigLedgerPeer,
eicIsLocalRootPeer = False
})
muxBundle
res <-
Expand Down
3 changes: 2 additions & 1 deletion ouroboros-network-framework/src/Ouroboros/Network/Context.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import Ouroboros.Network.PeerSelection.LedgerPeers.Type
data ExpandedInitiatorContext addr m = ExpandedInitiatorContext {
eicConnectionId :: !(ConnectionId addr),
eicControlMessage :: !(ControlMessageSTM m),
eicIsBigLedgerPeer :: !IsBigLedgerPeer
eicIsBigLedgerPeer :: !IsBigLedgerPeer,
eicIsLocalRootPeer :: !Bool
}

-- | A context passed to initiator mini-protocol execution for non-p2p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,8 @@ runInitiatorProtocols singMuxMode mux bundle controlBundle connId = do
initiatorCtx = ExpandedInitiatorContext {
eicConnectionId = connId,
eicControlMessage = controlMessage,
eicIsBigLedgerPeer = IsNotBigLedgerPeer
eicIsBigLedgerPeer = IsNotBigLedgerPeer,
eicIsLocalRootPeer = False
}

--
Expand Down
3 changes: 3 additions & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Breaking changes

* Added `eicIsLocalRootPeer :: Bool` field to NodeToNode
`ExpandedInitiatorContext`

### Non-Breaking changes

## 0.13.0.0 -- 2023-03-14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ mockPeerSelectionActions' tracer
traceWith tracer (TraceEnvPeerShareResult addr peeraddrs)
return (PeerSharingResult peeraddrs)

establishPeerConnection :: IsBigLedgerPeer -> PeerAddr -> m (PeerConn m)
establishPeerConnection _ peeraddr = do
establishPeerConnection :: IsBigLedgerPeer -> Bool -> PeerAddr -> m (PeerConn m)
establishPeerConnection _ _ peeraddr = do
--TODO: add support for variable delays and synchronous failure
traceWith tracer (TraceEnvEstablishConn peeraddr)
threadDelay 1
Expand Down Expand Up @@ -476,8 +476,8 @@ mockPeerSelectionActions' tracer
in loop
return conn

activatePeerConnection :: IsBigLedgerPeer -> PeerConn m -> m ()
activatePeerConnection _ (PeerConn peeraddr _ conn) = do
activatePeerConnection :: IsBigLedgerPeer -> Bool -> PeerConn m -> m ()
activatePeerConnection _ _ (PeerConn peeraddr _ conn) = do
traceWith tracer (TraceEnvActivatePeer peeraddr)
threadDelay 1
atomically $ do
Expand Down
3 changes: 2 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -503,5 +503,6 @@ expandContext MinimalInitiatorContext { micConnectionId = connId } =
ExpandedInitiatorContext {
eicConnectionId = connId,
eicControlMessage = continueForever Proxy,
eicIsBigLedgerPeer = IsNotBigLedgerPeer
eicIsBigLedgerPeer = IsNotBigLedgerPeer,
eicIsLocalRootPeer = False
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ belowTargetBigLedgerPeers actions
publicRootPeers,
establishedPeers,
activePeers,
localRootPeers,
inProgressPromoteWarm,
inProgressDemoteWarm,
inProgressDemoteToCold,
Expand Down Expand Up @@ -115,8 +116,10 @@ belowTargetBigLedgerPeers actions
inProgressPromoteWarm = inProgressPromoteWarm
<> selectedToPromote
},
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsBigLedgerPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote' ]
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsBigLedgerPeer isLocalRootPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote'
, let isLocalRootPeer = LocalRootPeers.member peeraddr localRootPeers
]
}

-- If we could promote except that there are no peers currently available
Expand Down Expand Up @@ -204,8 +207,10 @@ belowTargetLocal actions
inProgressPromoteWarm = inProgressPromoteWarm
<> selectedToPromote
},
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsNotBigLedgerPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote' ]
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsNotBigLedgerPeer isLocalRootPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote'
, let isLocalRootPeer = LocalRootPeers.member peeraddr localRootPeers
]
}


Expand Down Expand Up @@ -287,8 +292,10 @@ belowTargetOther actions
inProgressPromoteWarm = inProgressPromoteWarm
<> selectedToPromote
},
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsNotBigLedgerPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote' ]
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsNotBigLedgerPeer isLocalRootPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote'
, let isLocalRootPeer = LocalRootPeers.member peeraddr localRootPeers
]
}

-- If we could promote except that there are no peers currently available
Expand All @@ -312,11 +319,12 @@ jobPromoteWarmPeer :: forall peeraddr peerconn m.
-> PeerSelectionPolicy peeraddr m
-> peeraddr
-> IsBigLedgerPeer
-> Bool
-> peerconn
-> Job () m (Completion m peeraddr peerconn)
jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {activatePeerConnection}}
PeerSelectionPolicy { policyErrorDelay }
peeraddr isBigLedgerPeer peerconn =
peeraddr isBigLedgerPeer isLocalRootPeer peerconn =
Job job handler () "promoteWarmPeer"
where
handler :: SomeException -> m (Completion m peeraddr peerconn)
Expand Down Expand Up @@ -411,7 +419,7 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act

job :: m (Completion m peeraddr peerconn)
job = do
activatePeerConnection isBigLedgerPeer peerconn
activatePeerConnection isBigLedgerPeer isLocalRootPeer peerconn
return $ Completion $ \st@PeerSelectionState {
publicRootPeers,
activePeers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ belowTargetLocal actions
inProgressPromoteCold = inProgressPromoteCold
<> selectedToPromote
},
decisionJobs = [ jobPromoteColdPeer actions policy peer IsNotBigLedgerPeer
| peer <- Set.toList selectedToPromote ]
decisionJobs = [ jobPromoteColdPeer actions policy peer IsNotBigLedgerPeer isLocalRootPeer
| peer <- Set.toList selectedToPromote
, let isLocalRootPeer = LocalRootPeers.member peer localRootPeers
]
}

-- If we could promote except that there are no peers currently available
Expand Down Expand Up @@ -177,6 +179,7 @@ belowTargetOther actions
knownPeers,
publicRootPeers,
establishedPeers,
localRootPeers,
inProgressPromoteCold,
targets = PeerSelectionTargets {
targetNumberOfEstablishedPeers
Expand Down Expand Up @@ -220,8 +223,10 @@ belowTargetOther actions
inProgressPromoteCold = inProgressPromoteCold
<> selectedToPromote
},
decisionJobs = [ jobPromoteColdPeer actions policy peer IsNotBigLedgerPeer
| peer <- Set.toList selectedToPromote ]
decisionJobs = [ jobPromoteColdPeer actions policy peer IsNotBigLedgerPeer isLocalRootPeer
| peer <- Set.toList selectedToPromote
, let isLocalRootPeer = LocalRootPeers.member peer localRootPeers
]
}

-- If we could connect except that there are no peers currently available
Expand Down Expand Up @@ -262,6 +267,7 @@ belowTargetBigLedgerPeers actions
publicRootPeers,
establishedPeers,
inProgressPromoteCold,
localRootPeers,
targets = PeerSelectionTargets {
targetNumberOfEstablishedBigLedgerPeers
},
Expand Down Expand Up @@ -310,8 +316,10 @@ belowTargetBigLedgerPeers actions
inProgressPromoteCold = inProgressPromoteCold
<> selectedToPromote
},
decisionJobs = [ jobPromoteColdPeer actions policy peer IsBigLedgerPeer
| peer <- Set.toList selectedToPromote ]
decisionJobs = [ jobPromoteColdPeer actions policy peer IsBigLedgerPeer isLocalRootPeer
| peer <- Set.toList selectedToPromote
, let isLocalRootPeer = LocalRootPeers.member peer localRootPeers
]
}

-- If we could connect except that there are no peers currently available
Expand Down Expand Up @@ -354,13 +362,14 @@ jobPromoteColdPeer :: forall peeraddr peerconn m.
-> PeerSelectionPolicy peeraddr m
-> peeraddr
-> IsBigLedgerPeer
-> Bool
-> Job () m (Completion m peeraddr peerconn)
jobPromoteColdPeer PeerSelectionActions {
peerStateActions = PeerStateActions {establishPeerConnection},
peerConnToPeerSharing
}
PeerSelectionPolicy { policyPeerShareActivationDelay }
peeraddr isBigLedgerPeer =
peeraddr isBigLedgerPeer isLocalRootPeer =
Job job handler () "promoteColdPeer"
where
handler :: SomeException -> m (Completion m peeraddr peerconn)
Expand Down Expand Up @@ -418,7 +427,7 @@ jobPromoteColdPeer PeerSelectionActions {
job = do
--TODO: decide if we should do timeouts here or if we should make that
-- the responsibility of establishPeerConnection
peerconn <- establishPeerConnection isBigLedgerPeer peeraddr
peerconn <- establishPeerConnection isBigLedgerPeer isLocalRootPeer peeraddr
let !peerSharing = peerConnToPeerSharing peerconn

return $ Completion $ \st@PeerSelectionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ data PeerStateActions peeraddr peerconn m = PeerStateActions {
-- 'IsBigLedgerPeer' is passed from the outbound governor to the
-- mini-protocol callbacks.
--
establishPeerConnection :: IsBigLedgerPeer
establishPeerConnection :: IsBigLedgerPeer -> Bool
-> peeraddr -> m peerconn,

-- | Activate a connection: warm to hot promotion.
--
-- 'IsBigLedgerPeer' is passed from the outbound governor to the
-- mini-protocol callbacks.
--
activatePeerConnection :: IsBigLedgerPeer
activatePeerConnection :: IsBigLedgerPeer -> Bool
-> peerconn -> m (),

-- | Deactive a peer: hot to warm demotion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,10 @@ data PeerConnectionHandle (muxMode :: MuxMode) responderCtx peerAddr versionData
mkInitiatorContext :: MonadSTM m
=> SingProtocolTemperature pt
-> IsBigLedgerPeer
-> Bool
-> PeerConnectionHandle muxMode responderCtx peerAddr versionDat bytes m a b
-> ExpandedInitiatorContext peerAddr m
mkInitiatorContext tok isBigLedgerPeer
mkInitiatorContext tok isBigLedgerPeer isLocalRootPeer
PeerConnectionHandle {
pchConnectionId = connectionId,
pchAppHandles = appHandles
Expand All @@ -444,7 +445,8 @@ mkInitiatorContext tok isBigLedgerPeer
ExpandedInitiatorContext {
eicConnectionId = connectionId,
eicControlMessage = readTVar (getControlVar tok appHandles),
eicIsBigLedgerPeer = isBigLedgerPeer
eicIsBigLedgerPeer = isBigLedgerPeer,
eicIsLocalRootPeer = isLocalRootPeer
}


Expand Down Expand Up @@ -719,9 +721,10 @@ withPeerStateActions PeerStateActionsArguments {

establishPeerConnection :: JobPool () m (Maybe SomeException)
-> IsBigLedgerPeer
-> Bool
-> peerAddr
-> m (PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b)
establishPeerConnection jobPool isBigLedgerPeer remotePeerAddr =
establishPeerConnection jobPool isBigLedgerPeer isLocalRootPeer remotePeerAddr =
-- Protect consistency of the peer state with 'bracketOnError' if
-- opening a connection fails.
bracketOnError
Expand Down Expand Up @@ -753,8 +756,8 @@ withPeerStateActions PeerStateActionsArguments {
pchVersionData = versionData
}

startProtocols SingWarm isBigLedgerPeer connHandle
startProtocols SingEstablished isBigLedgerPeer connHandle
startProtocols SingWarm isBigLedgerPeer isLocalRootPeer connHandle
startProtocols SingEstablished isBigLedgerPeer isLocalRootPeer connHandle
atomically $ writeTVar peerStateVar PeerWarm
traceWith spsTracer (PeerStatusChanged
(ColdToWarm
Expand Down Expand Up @@ -877,10 +880,12 @@ withPeerStateActions PeerStateActionsArguments {
-- of time timeouts should be implemented here in the same way it is in
-- establishPeerConnection and deactivatePeerConnection.
activatePeerConnection :: IsBigLedgerPeer
-> Bool
-> PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b
-> m ()
activatePeerConnection
isBigLedgerPeer
isLocalRootPeer
connHandle@PeerConnectionHandle {
pchConnectionId,
pchPeerStatus,
Expand All @@ -900,7 +905,7 @@ withPeerStateActions PeerStateActionsArguments {
throwIO $ ColdActivationException pchConnectionId

-- start hot peer protocols
startProtocols SingHot isBigLedgerPeer connHandle
startProtocols SingHot isBigLedgerPeer isLocalRootPeer connHandle

-- Only set the status to PeerHot if the peer isn't PeerCold.
-- This can happen asynchronously between the check above and now.
Expand Down Expand Up @@ -1098,9 +1103,10 @@ startProtocols :: forall (muxMode :: MuxMode) (pt :: ProtocolTemperature)
)
=> SingProtocolTemperature pt
-> IsBigLedgerPeer
-> Bool
-> PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b
-> m ()
startProtocols tok isBigLedgerPeer connHandle@PeerConnectionHandle { pchMux, pchAppHandles } = do
startProtocols tok isBigLedgerPeer isLocalRootPeer connHandle@PeerConnectionHandle { pchMux, pchAppHandles } = do
let ptcls = getProtocols tok pchAppHandles
as <- traverse runInitiator ptcls
atomically $ writeTVar (getMiniProtocolsVar tok pchAppHandles)
Expand Down Expand Up @@ -1134,7 +1140,7 @@ startProtocols tok isBigLedgerPeer connHandle@PeerConnectionHandle { pchMux, pch
(runMiniProtocolCb initiator context)
where
context :: ExpandedInitiatorContext peerAddr m
context = mkInitiatorContext tok isBigLedgerPeer connHandle
context = mkInitiatorContext tok isBigLedgerPeer isLocalRootPeer connHandle


--
Expand Down

0 comments on commit 0bd3e0d

Please sign in to comment.