Skip to content

Commit

Permalink
refactor withPeerSelectionAction argument list
Browse files Browse the repository at this point in the history
  • Loading branch information
crocodile-dentist committed Feb 14, 2024
1 parent f83f71a commit e215ed3
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 186 deletions.
1 change: 1 addition & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Updated type of constructor in `TraceLocalRootPeers`
* Added `TraceDebugState` message to `TracePeerSelection` for tracing
peer selection upon getting a USR1 sig.
* Changed withPeerSelectionActions and withLedgerPeers signatures

### Non-breaking changes

Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ library
Ouroboros.Network.PeerSelection.State.EstablishedPeers
Ouroboros.Network.PeerSelection.State.KnownPeers
Ouroboros.Network.PeerSelection.State.LocalRootPeers
Ouroboros.Network.PeerSelection.RootPeersDNS
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import Cardano.Slotting.Slot (SlotNo)
import Control.Concurrent.Class.MonadSTM.Strict
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.RelayAccessPoint
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.Testing.Data.Script
import Test.Ouroboros.Network.PeerSelection.RootPeersDNS
import Test.QuickCheck
Expand Down Expand Up @@ -186,10 +186,10 @@ prop_pick100 seed (NonNegative n) (ArbLedgerPeersKind ledgerPeersKind) (MockRoot
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore

withLedgerPeers
rng dnsSemaphore (curry IP.toSockAddr) verboseTracer
(pure (UseLedgerPeers (After 0)))
interface
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
(PeerActionsDNS (curry IP.toSockAddr)
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
dnsSemaphore)
(WithLedgerPeersRec rng interface verboseTracer (pure $ UseLedgerPeers (After 0)))
(\request _ -> do
threadDelay 1900 -- we need to invalidate ledger peer's cache
resp <- request (NumberOfPeers 1) ledgerPeersKind
Expand Down Expand Up @@ -243,10 +243,10 @@ prop_pick (LedgerPools lps) (ArbLedgerPeersKind ledgerPeersKind) count seed (Moc
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore

withLedgerPeers
rng dnsSemaphore (curry IP.toSockAddr) verboseTracer
(pure (UseLedgerPeers (After 0)))
interface
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
(PeerActionsDNS (curry IP.toSockAddr)
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
dnsSemaphore)
(WithLedgerPeersRec rng interface verboseTracer (pure $ UseLedgerPeers (After 0)))
(\request _ -> do
threadDelay 1900 -- we need to invalidate ledger peer's cache
resp <- request (NumberOfPeers count) ledgerPeersKind
Expand Down
125 changes: 47 additions & 78 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ import Ouroboros.Network.PeerSelection.Governor qualified as Governor
import Ouroboros.Network.PeerSelection.Governor.Types
(ChurnMode (ChurnModeNormal), DebugPeerSelection (..),
PeerSelectionActions, PeerSelectionCounters (..),
PeerSelectionPolicy (..), PeerSelectionState, PeerStateActions,
PeerSelectionPolicy (..), PeerSelectionState,
PublicPeerSelectionState (..), TracePeerSelection (..),
emptyPeerSelectionState, emptyPublicPeerSelectionState)
#ifdef POSIX
import Ouroboros.Network.PeerSelection.Governor.Types
(makeDebugPeerSelectionState)
#endif
import Ouroboros.Network.PeerSelection.LedgerPeers
(LedgerPeersConsensusInterface, TraceLedgerPeers)
(TraceLedgerPeers, WithLedgerPeersRec (..))
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface (..), UseLedgerPeers)
#ifdef POSIX
Expand All @@ -129,6 +129,7 @@ import Ouroboros.Network.PeerSelection.PeerStateActions (PeerConnectionHandle,
pchPeerSharing, withPeerStateActions)
import Ouroboros.Network.PeerSelection.PeerTrustable (PeerTrustable)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint)
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSActions,
DNSLookupType (..), ioDNSActions)
import Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
Expand Down Expand Up @@ -950,24 +951,14 @@ runM Interfaces
spsExitPolicy = exitPolicy
}

dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore
--
-- Run peer selection (p2p governor)
--
let withPeerSelectionActions'
:: forall muxMode responderCtx peerAddr bytes a1 b c.
STM m (ntnAddr, PeerSharing)
-- ^ Read New Inbound Connections
-> PeerStateActions
ntnAddr
(PeerConnectionHandle
muxMode responderCtx peerAddr ntnVersionData bytes m a1 b)
m
-> StdGen
-- ^ Random generator for picking ledger peers
-> LedgerPeersConsensusInterface m
-- ^ Get Ledger Peers comes from here
-> STM m UseLedgerPeers
-- ^ Get Use Ledger After value
PeerSelectionActionsExtraRec ntnAddr (PeerConnectionHandle muxMode responderCtx peerAddr ntnVersionData bytes m a1 b) m
-> WithLedgerPeersRec m
-> ( (Async m Void, Async m Void)
-> PeerSelectionActions
ntnAddr
Expand All @@ -979,21 +970,19 @@ runM Interfaces
-- (only if local root peers were non-empty).
-> m c
withPeerSelectionActions' =
withPeerSelectionActions
dtTraceLocalRootPeersTracer
dtTracePublicRootPeersTracer
dtTraceLedgerPeersTracer
diNtnToPeerAddr
(diDnsActions lookupReqs)
(readTVar peerSelectionTargetsVar)
lpGetLedgerStateJudgement
daReadLocalRootPeers
daReadPublicRootPeers
daReadUseBootstrapPeers
daOwnPeerSharing
(pchPeerSharing diNtnPeerSharing)
(readTVar (getPeerSharingRegistry daPeerSharingRegistry))

withPeerSelectionActions (PeerSelectionActionsRec
dtTraceLocalRootPeersTracer
dtTracePublicRootPeersTracer
(readTVar peerSelectionTargetsVar)
lpGetLedgerStateJudgement
daReadLocalRootPeers
daReadPublicRootPeers
daReadUseBootstrapPeers
daOwnPeerSharing
(pchPeerSharing diNtnPeerSharing)
(readTVar (getPeerSharingRegistry daPeerSharingRegistry)))
(PeerActionsDNS diNtnToPeerAddr (diDnsActions lookupReqs) dnsSemaphore)

peerSelectionGovernor'
:: forall (muxMode :: MuxMode) b.
Tracer m (DebugPeerSelection ntnAddr)
Expand Down Expand Up @@ -1072,67 +1061,47 @@ runM Interfaces
diInstallSigUSR1Handler connectionManager debugStateVar daPeerMetrics
withPeerStateActions' connectionManager $ \peerStateActions->
withPeerSelectionActions'
retry
peerStateActions
ledgerPeersRng
daLedgerPeersCtx
daReadUseLedgerPeers
$ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->

Async.withAsync
(peerSelectionGovernor'
(PeerSelectionActionsExtraRec retry peerStateActions)
(WithLedgerPeersRec ledgerPeersRng daLedgerPeersCtx dtTraceLedgerPeersTracer daReadUseLedgerPeers) $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor'
dtDebugPeerSelectionInitiatorTracer
debugStateVar
peerSelectionActions)
$ \governorThread ->
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ ledgerPeersThread
, localRootPeersProvider
, governorThread
, churnGovernorThread
]
peerSelectionActions) $ \governorThread ->
Async.withAsync
peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread]

-- InitiatorAndResponder mode, run peer selection and the server:
InitiatorAndResponderDiffusionMode -> do
inboundInfoChannel <- newInformationChannel
outboundInfoChannel <- newInformationChannel
observableStateVar <- Server.newObservableStateVar ntnInbgovRng
withConnectionManagerInitiatorAndResponderMode
inboundInfoChannel outboundInfoChannel
inboundInfoChannel
outboundInfoChannel
observableStateVar $ \connectionManager-> do
debugStateVar <- newTVarIO $ emptyPeerSelectionState fuzzRng []
diInstallSigUSR1Handler connectionManager debugStateVar daPeerMetrics
withPeerStateActions' connectionManager $ \peerStateActions->
withPeerStateActions' connectionManager $ \peerStateActions ->
withPeerSelectionActions'
(readMessage outboundInfoChannel)
peerStateActions
ledgerPeersRng
daLedgerPeersCtx
daReadUseLedgerPeers
$ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor'
dtDebugPeerSelectionInitiatorResponderTracer
debugStateVar
peerSelectionActions) $ \governorThread ->
-- begin, unique to InitiatorAndResponder mode:
withSockets' $ \sockets addresses -> do
traceWith tracer (RunServer addresses)
Async.withAsync
(serverRun' sockets connectionManager inboundInfoChannel
observableStateVar) $ \serverThread ->
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ ledgerPeersThread
, localRootPeersProvider
, governorThread
, churnGovernorThread
, serverThread
]
(PeerSelectionActionsExtraRec (readMessage outboundInfoChannel) peerStateActions)
(WithLedgerPeersRec ledgerPeersRng daLedgerPeersCtx dtTraceLedgerPeersTracer daReadUseLedgerPeers) $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread ->
-- begin, unique to InitiatorAndResponder mode:
withSockets' $ \sockets addresses -> do
traceWith tracer (RunServer addresses)
Async.withAsync
(serverRun' sockets connectionManager inboundInfoChannel observableStateVar) $ \serverThread ->
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny [ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread, serverThread]

-- | Main entry point for data diffusion service. It allows to:
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

Expand All @@ -29,6 +28,7 @@ module Ouroboros.Network.PeerSelection.LedgerPeers
, accBigPoolStake
, bigLedgerPeerQuota
-- * DNS based provider for ledger root peers
, WithLedgerPeersRec (..)
, withLedgerPeers
-- Re-exports for testing purposes
, module Ouroboros.Network.PeerSelection.LedgerPeers.Type
Expand Down Expand Up @@ -65,8 +65,8 @@ import Ouroboros.Network.PeerSelection.LedgerPeers.Common
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
import Ouroboros.Network.PeerSelection.RelayAccessPoint
import Ouroboros.Network.PeerSelection.RelayAccessPoint qualified as Socket
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
import Ouroboros.Network.PeerSelection.RootPeersDNS.LedgerPeers
(resolveLedgerPeers)

Expand Down Expand Up @@ -392,6 +392,17 @@ ledgerPeersThread inRng dnsSemaphore toPeerAddr tracer readUseLedgerAfter
addrs' = Set.insert addr addrs
in (addrs', domains)


data WithLedgerPeersRec m = WithLedgerPeersRec {
ledgerPeersRng :: StdGen,
-- ^ Random generator for picking ledger peers
ledgerPeersConsensusInterface :: LedgerPeersConsensusInterface m,
ledgerPeersTracer :: Tracer m TraceLedgerPeers,
-- ^ Get Ledger Peers comes from here
getUseLedgerPeers :: STM m UseLedgerPeers
-- ^ Get Use Ledger After value
}

-- | For a LedgerPeers worker thread and submit request and receive responses.
--
withLedgerPeers :: forall peerAddr resolver exception m a.
Expand All @@ -401,18 +412,15 @@ withLedgerPeers :: forall peerAddr resolver exception m a.
, Exception exception
, Ord peerAddr
)
=> StdGen
-> DNSSemaphore m
-> (IP.IP -> Socket.PortNumber -> peerAddr)
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerPeers
-> LedgerPeersConsensusInterface m
-> DNSActions resolver exception m
-> ( (NumberOfPeers -> LedgerPeersKind -> m (Maybe (Set peerAddr, DiffTime)))
=> PeerActionsDNS peerAddr resolver exception m
-> WithLedgerPeersRec m
-> ((NumberOfPeers -> LedgerPeersKind -> m (Maybe (Set peerAddr, DiffTime)))
-> Async m Void
-> m a )
-> m a
withLedgerPeers inRng dnsSemaphore toPeerAddr tracer readUseLedgerPeers interface dnsActions k = do
withLedgerPeers PeerActionsDNS { toPeerAddr, dnsActions, dnsSemaphore }
WithLedgerPeersRec { ledgerPeersRng = inRng, ledgerPeersConsensusInterface = interface, ledgerPeersTracer = tracer, getUseLedgerPeers = readUseLedgerPeers }
k = do
reqVar <- newEmptyTMVarIO
respVar <- newEmptyTMVarIO
let getRequest = takeTMVar reqVar
Expand Down
Loading

0 comments on commit e215ed3

Please sign in to comment.