Skip to content

Commit

Permalink
Merge pull request #4803 from IntersectMBO/refactor/withpeerselection…
Browse files Browse the repository at this point in the history
…action

refactor withPeerSelectionAction argument list
  • Loading branch information
crocodile-dentist authored Feb 20, 2024
2 parents b118f38 + b4f086a commit 8ab4448
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 185 deletions.
1 change: 1 addition & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Updated type of constructor in `TraceLocalRootPeers`
* Added `TraceDebugState` message to `TracePeerSelection` for tracing
peer selection upon getting a USR1 sig.
* Changed withPeerSelectionActions and withLedgerPeers signatures

### Non-breaking changes

Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ library
Ouroboros.Network.PeerSelection.State.EstablishedPeers
Ouroboros.Network.PeerSelection.State.KnownPeers
Ouroboros.Network.PeerSelection.State.LocalRootPeers
Ouroboros.Network.PeerSelection.RootPeersDNS
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import Cardano.Slotting.Slot (SlotNo, WithOrigin (..))
import Control.Concurrent.Class.MonadSTM.Strict
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.RelayAccessPoint
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.Testing.Data.Script
import Test.Ouroboros.Network.PeerSelection.RootPeersDNS
import Test.QuickCheck
Expand Down Expand Up @@ -186,10 +186,13 @@ prop_pick100 seed (NonNegative n) (ArbLedgerPeersKind ledgerPeersKind) (MockRoot
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore

withLedgerPeers
rng dnsSemaphore (curry IP.toSockAddr) verboseTracer
(pure (UseLedgerPeers Always))
interface
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
PeerActionsDNS { paToPeerAddr = curry IP.toSockAddr,
paDnsActions = (mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar),
paDnsSemaphore = dnsSemaphore }
WithLedgerPeersArgs { wlpRng = rng,
wlpConsensusInterface = interface,
wlpTracer = verboseTracer,
wlpGetUseLedgerPeers = pure $ UseLedgerPeers Always }
(\request _ -> do
threadDelay 1900 -- we need to invalidate ledger peer's cache
resp <- request (NumberOfPeers 1) ledgerPeersKind
Expand Down Expand Up @@ -243,10 +246,13 @@ prop_pick (LedgerPools lps) (ArbLedgerPeersKind ledgerPeersKind) count seed (Moc
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore

withLedgerPeers
rng dnsSemaphore (curry IP.toSockAddr) verboseTracer
(pure (UseLedgerPeers (After 0)))
interface
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
PeerActionsDNS { paToPeerAddr = curry IP.toSockAddr,
paDnsActions = mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar,
paDnsSemaphore = dnsSemaphore }
WithLedgerPeersArgs { wlpRng = rng,
wlpConsensusInterface = interface,
wlpTracer = verboseTracer,
wlpGetUseLedgerPeers = pure $ UseLedgerPeers (After 0) }
(\request _ -> do
threadDelay 1900 -- we need to invalidate ledger peer's cache
resp <- request (NumberOfPeers count) ledgerPeersKind
Expand Down
134 changes: 56 additions & 78 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ import Ouroboros.Network.PeerSelection.Governor qualified as Governor
import Ouroboros.Network.PeerSelection.Governor.Types
(ChurnMode (ChurnModeNormal), DebugPeerSelection (..),
PeerSelectionActions, PeerSelectionCounters (..),
PeerSelectionPolicy (..), PeerSelectionState, PeerStateActions,
PeerSelectionPolicy (..), PeerSelectionState,
PublicPeerSelectionState (..), TracePeerSelection (..),
emptyPeerSelectionState, emptyPublicPeerSelectionState)
#ifdef POSIX
import Ouroboros.Network.PeerSelection.Governor.Types
(makeDebugPeerSelectionState)
#endif
import Ouroboros.Network.PeerSelection.LedgerPeers
(LedgerPeersConsensusInterface, TraceLedgerPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers (TraceLedgerPeers,
WithLedgerPeersArgs (..))
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface (..), UseLedgerPeers)
#ifdef POSIX
Expand All @@ -129,6 +129,7 @@ import Ouroboros.Network.PeerSelection.PeerStateActions (PeerConnectionHandle,
pchPeerSharing, withPeerStateActions)
import Ouroboros.Network.PeerSelection.PeerTrustable (PeerTrustable)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint)
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSActions,
DNSLookupType (..), ioDNSActions)
import Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
Expand Down Expand Up @@ -950,24 +951,13 @@ runM Interfaces
spsExitPolicy = exitPolicy
}

dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore
--
-- Run peer selection (p2p governor)
--
let withPeerSelectionActions'
:: forall muxMode responderCtx peerAddr bytes a1 b c.
STM m (ntnAddr, PeerSharing)
-- ^ Read New Inbound Connections
-> PeerStateActions
ntnAddr
(PeerConnectionHandle
muxMode responderCtx peerAddr ntnVersionData bytes m a1 b)
m
-> StdGen
-- ^ Random generator for picking ledger peers
-> LedgerPeersConsensusInterface m
-- ^ Get Ledger Peers comes from here
-> STM m UseLedgerPeers
-- ^ Get Use Ledger After value
PeerSelectionActionsDiffusionMode ntnAddr (PeerConnectionHandle muxMode responderCtx peerAddr ntnVersionData bytes m a1 b) m
-> ( (Async m Void, Async m Void)
-> PeerSelectionActions
ntnAddr
Expand All @@ -979,20 +969,26 @@ runM Interfaces
-- (only if local root peers were non-empty).
-> m c
withPeerSelectionActions' =
withPeerSelectionActions
dtTraceLocalRootPeersTracer
dtTracePublicRootPeersTracer
dtTraceLedgerPeersTracer
diNtnToPeerAddr
(diDnsActions lookupReqs)
(readTVar peerSelectionTargetsVar)
lpGetLedgerStateJudgement
daReadLocalRootPeers
daReadPublicRootPeers
daReadUseBootstrapPeers
daOwnPeerSharing
(pchPeerSharing diNtnPeerSharing)
(readTVar (getPeerSharingRegistry daPeerSharingRegistry))
withPeerSelectionActions PeerActionsDNS {
paToPeerAddr = diNtnToPeerAddr,
paDnsActions = diDnsActions lookupReqs,
paDnsSemaphore = dnsSemaphore }
PeerSelectionActionsArgs {
psLocalRootPeersTracer = dtTraceLocalRootPeersTracer,
psPublicRootPeersTracer = dtTracePublicRootPeersTracer,
psReadTargets = readTVar peerSelectionTargetsVar,
psJudgement = lpGetLedgerStateJudgement,
psReadLocalRootPeers = daReadLocalRootPeers,
psReadPublicRootPeers = daReadPublicRootPeers,
psReadUseBootstrapPeers = daReadUseBootstrapPeers,
psPeerSharing = daOwnPeerSharing,
psPeerConnToPeerSharing = pchPeerSharing diNtnPeerSharing,
psReadPeerSharingController = readTVar (getPeerSharingRegistry daPeerSharingRegistry) }
WithLedgerPeersArgs {
wlpRng = ledgerPeersRng,
wlpConsensusInterface = daLedgerPeersCtx,
wlpTracer = dtTraceLedgerPeersTracer,
wlpGetUseLedgerPeers = daReadUseLedgerPeers }

peerSelectionGovernor'
:: forall (muxMode :: MuxMode) b.
Expand Down Expand Up @@ -1072,67 +1068,49 @@ runM Interfaces
diInstallSigUSR1Handler connectionManager debugStateVar daPeerMetrics
withPeerStateActions' connectionManager $ \peerStateActions->
withPeerSelectionActions'
retry
peerStateActions
ledgerPeersRng
daLedgerPeersCtx
daReadUseLedgerPeers
$ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->

Async.withAsync
(peerSelectionGovernor'
PeerSelectionActionsDiffusionMode {
psNewInboundConnections = retry,
psPeerStateActions = peerStateActions } $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor'
dtDebugPeerSelectionInitiatorTracer
debugStateVar
peerSelectionActions)
$ \governorThread ->
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ ledgerPeersThread
, localRootPeersProvider
, governorThread
, churnGovernorThread
]
peerSelectionActions) $ \governorThread ->
Async.withAsync
peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread]

-- InitiatorAndResponder mode, run peer selection and the server:
InitiatorAndResponderDiffusionMode -> do
inboundInfoChannel <- newInformationChannel
outboundInfoChannel <- newInformationChannel
observableStateVar <- Server.newObservableStateVar ntnInbgovRng
withConnectionManagerInitiatorAndResponderMode
inboundInfoChannel outboundInfoChannel
inboundInfoChannel
outboundInfoChannel
observableStateVar $ \connectionManager-> do
debugStateVar <- newTVarIO $ emptyPeerSelectionState fuzzRng []
diInstallSigUSR1Handler connectionManager debugStateVar daPeerMetrics
withPeerStateActions' connectionManager $ \peerStateActions->
withPeerStateActions' connectionManager $ \peerStateActions ->
withPeerSelectionActions'
(readMessage outboundInfoChannel)
peerStateActions
ledgerPeersRng
daLedgerPeersCtx
daReadUseLedgerPeers
$ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor'
dtDebugPeerSelectionInitiatorResponderTracer
debugStateVar
peerSelectionActions) $ \governorThread ->
-- begin, unique to InitiatorAndResponder mode:
withSockets' $ \sockets addresses -> do
traceWith tracer (RunServer addresses)
Async.withAsync
(serverRun' sockets connectionManager inboundInfoChannel
observableStateVar) $ \serverThread ->
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ ledgerPeersThread
, localRootPeersProvider
, governorThread
, churnGovernorThread
, serverThread
]
PeerSelectionActionsDiffusionMode {
psNewInboundConnections = readMessage outboundInfoChannel,
psPeerStateActions = peerStateActions } $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread ->
-- begin, unique to InitiatorAndResponder mode:
withSockets' $ \sockets addresses -> do
traceWith tracer (RunServer addresses)
Async.withAsync
(serverRun' sockets connectionManager inboundInfoChannel observableStateVar) $ \serverThread ->
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny [ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread, serverThread]

-- | Main entry point for data diffusion service. It allows to:
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

Expand All @@ -29,6 +28,7 @@ module Ouroboros.Network.PeerSelection.LedgerPeers
, accBigPoolStake
, bigLedgerPeerQuota
-- * DNS based provider for ledger root peers
, WithLedgerPeersArgs (..)
, withLedgerPeers
-- Re-exports for testing purposes
, module Ouroboros.Network.PeerSelection.LedgerPeers.Type
Expand Down Expand Up @@ -65,8 +65,8 @@ import Ouroboros.Network.PeerSelection.LedgerPeers.Common
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
import Ouroboros.Network.PeerSelection.RelayAccessPoint
import Ouroboros.Network.PeerSelection.RelayAccessPoint qualified as Socket
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
import Ouroboros.Network.PeerSelection.RootPeersDNS.LedgerPeers
(resolveLedgerPeers)

Expand Down Expand Up @@ -394,6 +394,18 @@ ledgerPeersThread inRng dnsSemaphore toPeerAddr tracer readUseLedgerAfter
addrs' = Set.insert addr addrs
in (addrs', domains)

-- | Argument record for withLedgerPeers
--
data WithLedgerPeersArgs m = WithLedgerPeersArgs {
wlpRng :: StdGen,
-- ^ Random generator for picking ledger peers
wlpConsensusInterface :: LedgerPeersConsensusInterface m,
wlpTracer :: Tracer m TraceLedgerPeers,
-- ^ Get Ledger Peers comes from here
wlpGetUseLedgerPeers :: STM m UseLedgerPeers
-- ^ Get Use Ledger After value
}

-- | For a LedgerPeers worker thread and submit request and receive responses.
--
withLedgerPeers :: forall peerAddr resolver exception m a.
Expand All @@ -403,18 +415,15 @@ withLedgerPeers :: forall peerAddr resolver exception m a.
, Exception exception
, Ord peerAddr
)
=> StdGen
-> DNSSemaphore m
-> (IP.IP -> Socket.PortNumber -> peerAddr)
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerPeers
-> LedgerPeersConsensusInterface m
-> DNSActions resolver exception m
-> ( (NumberOfPeers -> LedgerPeersKind -> m (Maybe (Set peerAddr, DiffTime)))
=> PeerActionsDNS peerAddr resolver exception m
-> WithLedgerPeersArgs m
-> ((NumberOfPeers -> LedgerPeersKind -> m (Maybe (Set peerAddr, DiffTime)))
-> Async m Void
-> m a )
-> m a
withLedgerPeers inRng dnsSemaphore toPeerAddr tracer readUseLedgerPeers interface dnsActions k = do
withLedgerPeers PeerActionsDNS { paToPeerAddr = toPeerAddr, paDnsActions = dnsActions, paDnsSemaphore = dnsSemaphore }
WithLedgerPeersArgs { wlpRng = inRng, wlpConsensusInterface = interface, wlpTracer = tracer, wlpGetUseLedgerPeers = readUseLedgerPeers }
k = do
reqVar <- newEmptyTMVarIO
respVar <- newEmptyTMVarIO
let getRequest = takeTMVar reqVar
Expand Down
Loading

0 comments on commit 8ab4448

Please sign in to comment.