Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor withPeerSelectionAction argument list #4803

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Updated type of constructor in `TraceLocalRootPeers`
* Added `TraceDebugState` message to `TracePeerSelection` for tracing
peer selection upon getting a USR1 sig.
* Changed withPeerSelectionActions and withLedgerPeers signatures

### Non-breaking changes

Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ library
Ouroboros.Network.PeerSelection.State.EstablishedPeers
Ouroboros.Network.PeerSelection.State.KnownPeers
Ouroboros.Network.PeerSelection.State.LocalRootPeers
Ouroboros.Network.PeerSelection.RootPeersDNS
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import Cardano.Slotting.Slot (SlotNo, WithOrigin (..))
import Control.Concurrent.Class.MonadSTM.Strict
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.RelayAccessPoint
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.Testing.Data.Script
import Test.Ouroboros.Network.PeerSelection.RootPeersDNS
import Test.QuickCheck
Expand Down Expand Up @@ -186,10 +186,13 @@ prop_pick100 seed (NonNegative n) (ArbLedgerPeersKind ledgerPeersKind) (MockRoot
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore

withLedgerPeers
rng dnsSemaphore (curry IP.toSockAddr) verboseTracer
(pure (UseLedgerPeers Always))
interface
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
PeerActionsDNS { paToPeerAddr = curry IP.toSockAddr,
paDnsActions = (mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar),
paDnsSemaphore = dnsSemaphore }
WithLedgerPeersArgs { wlpRng = rng,
wlpConsensusInterface = interface,
wlpTracer = verboseTracer,
wlpGetUseLedgerPeers = pure $ UseLedgerPeers Always }
(\request _ -> do
threadDelay 1900 -- we need to invalidate ledger peer's cache
resp <- request (NumberOfPeers 1) ledgerPeersKind
Expand Down Expand Up @@ -243,10 +246,13 @@ prop_pick (LedgerPools lps) (ArbLedgerPeersKind ledgerPeersKind) count seed (Moc
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore

withLedgerPeers
rng dnsSemaphore (curry IP.toSockAddr) verboseTracer
(pure (UseLedgerPeers (After 0)))
interface
(mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar)
PeerActionsDNS { paToPeerAddr = curry IP.toSockAddr,
paDnsActions = mockDNSActions @SomeException dnsMapVar dnsTimeoutScriptVar dnsLookupDelayScriptVar,
paDnsSemaphore = dnsSemaphore }
WithLedgerPeersArgs { wlpRng = rng,
wlpConsensusInterface = interface,
wlpTracer = verboseTracer,
wlpGetUseLedgerPeers = pure $ UseLedgerPeers (After 0) }
(\request _ -> do
threadDelay 1900 -- we need to invalidate ledger peer's cache
resp <- request (NumberOfPeers count) ledgerPeersKind
Expand Down
134 changes: 56 additions & 78 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ import Ouroboros.Network.PeerSelection.Governor qualified as Governor
import Ouroboros.Network.PeerSelection.Governor.Types
(ChurnMode (ChurnModeNormal), DebugPeerSelection (..),
PeerSelectionActions, PeerSelectionCounters (..),
PeerSelectionPolicy (..), PeerSelectionState, PeerStateActions,
PeerSelectionPolicy (..), PeerSelectionState,
PublicPeerSelectionState (..), TracePeerSelection (..),
emptyPeerSelectionState, emptyPublicPeerSelectionState)
#ifdef POSIX
import Ouroboros.Network.PeerSelection.Governor.Types
(makeDebugPeerSelectionState)
#endif
import Ouroboros.Network.PeerSelection.LedgerPeers
(LedgerPeersConsensusInterface, TraceLedgerPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers (TraceLedgerPeers,
WithLedgerPeersArgs (..))
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface (..), UseLedgerPeers)
#ifdef POSIX
Expand All @@ -129,6 +129,7 @@ import Ouroboros.Network.PeerSelection.PeerStateActions (PeerConnectionHandle,
pchPeerSharing, withPeerStateActions)
import Ouroboros.Network.PeerSelection.PeerTrustable (PeerTrustable)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint)
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSActions,
DNSLookupType (..), ioDNSActions)
import Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
Expand Down Expand Up @@ -950,24 +951,13 @@ runM Interfaces
spsExitPolicy = exitPolicy
}

dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore
--
-- Run peer selection (p2p governor)
--
let withPeerSelectionActions'
:: forall muxMode responderCtx peerAddr bytes a1 b c.
STM m (ntnAddr, PeerSharing)
-- ^ Read New Inbound Connections
-> PeerStateActions
ntnAddr
(PeerConnectionHandle
muxMode responderCtx peerAddr ntnVersionData bytes m a1 b)
m
-> StdGen
-- ^ Random generator for picking ledger peers
-> LedgerPeersConsensusInterface m
-- ^ Get Ledger Peers comes from here
-> STM m UseLedgerPeers
-- ^ Get Use Ledger After value
PeerSelectionActionsDiffusionMode ntnAddr (PeerConnectionHandle muxMode responderCtx peerAddr ntnVersionData bytes m a1 b) m
-> ( (Async m Void, Async m Void)
-> PeerSelectionActions
ntnAddr
Expand All @@ -979,20 +969,26 @@ runM Interfaces
-- (only if local root peers were non-empty).
-> m c
withPeerSelectionActions' =
withPeerSelectionActions
dtTraceLocalRootPeersTracer
dtTracePublicRootPeersTracer
dtTraceLedgerPeersTracer
diNtnToPeerAddr
(diDnsActions lookupReqs)
(readTVar peerSelectionTargetsVar)
lpGetLedgerStateJudgement
daReadLocalRootPeers
daReadPublicRootPeers
daReadUseBootstrapPeers
daOwnPeerSharing
(pchPeerSharing diNtnPeerSharing)
(readTVar (getPeerSharingRegistry daPeerSharingRegistry))
withPeerSelectionActions PeerActionsDNS {
paToPeerAddr = diNtnToPeerAddr,
paDnsActions = diDnsActions lookupReqs,
paDnsSemaphore = dnsSemaphore }
PeerSelectionActionsArgs {
psLocalRootPeersTracer = dtTraceLocalRootPeersTracer,
psPublicRootPeersTracer = dtTracePublicRootPeersTracer,
psReadTargets = readTVar peerSelectionTargetsVar,
psJudgement = lpGetLedgerStateJudgement,
psReadLocalRootPeers = daReadLocalRootPeers,
psReadPublicRootPeers = daReadPublicRootPeers,
psReadUseBootstrapPeers = daReadUseBootstrapPeers,
psPeerSharing = daOwnPeerSharing,
psPeerConnToPeerSharing = pchPeerSharing diNtnPeerSharing,
psReadPeerSharingController = readTVar (getPeerSharingRegistry daPeerSharingRegistry) }
WithLedgerPeersArgs {
wlpRng = ledgerPeersRng,
wlpConsensusInterface = daLedgerPeersCtx,
wlpTracer = dtTraceLedgerPeersTracer,
wlpGetUseLedgerPeers = daReadUseLedgerPeers }

peerSelectionGovernor'
:: forall (muxMode :: MuxMode) b.
Expand Down Expand Up @@ -1072,67 +1068,49 @@ runM Interfaces
diInstallSigUSR1Handler connectionManager debugStateVar daPeerMetrics
withPeerStateActions' connectionManager $ \peerStateActions->
withPeerSelectionActions'
retry
peerStateActions
ledgerPeersRng
daLedgerPeersCtx
daReadUseLedgerPeers
$ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->

Async.withAsync
(peerSelectionGovernor'
PeerSelectionActionsDiffusionMode {
psNewInboundConnections = retry,
psPeerStateActions = peerStateActions } $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor'
dtDebugPeerSelectionInitiatorTracer
debugStateVar
peerSelectionActions)
$ \governorThread ->
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ ledgerPeersThread
, localRootPeersProvider
, governorThread
, churnGovernorThread
]
peerSelectionActions) $ \governorThread ->
Async.withAsync
peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread]

-- InitiatorAndResponder mode, run peer selection and the server:
InitiatorAndResponderDiffusionMode -> do
inboundInfoChannel <- newInformationChannel
outboundInfoChannel <- newInformationChannel
observableStateVar <- Server.newObservableStateVar ntnInbgovRng
withConnectionManagerInitiatorAndResponderMode
inboundInfoChannel outboundInfoChannel
inboundInfoChannel
outboundInfoChannel
observableStateVar $ \connectionManager-> do
debugStateVar <- newTVarIO $ emptyPeerSelectionState fuzzRng []
diInstallSigUSR1Handler connectionManager debugStateVar daPeerMetrics
withPeerStateActions' connectionManager $ \peerStateActions->
withPeerStateActions' connectionManager $ \peerStateActions ->
withPeerSelectionActions'
(readMessage outboundInfoChannel)
peerStateActions
ledgerPeersRng
daLedgerPeersCtx
daReadUseLedgerPeers
$ \(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor'
dtDebugPeerSelectionInitiatorResponderTracer
debugStateVar
peerSelectionActions) $ \governorThread ->
-- begin, unique to InitiatorAndResponder mode:
withSockets' $ \sockets addresses -> do
traceWith tracer (RunServer addresses)
Async.withAsync
(serverRun' sockets connectionManager inboundInfoChannel
observableStateVar) $ \serverThread ->
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny
[ ledgerPeersThread
, localRootPeersProvider
, governorThread
, churnGovernorThread
, serverThread
]
PeerSelectionActionsDiffusionMode {
psNewInboundConnections = readMessage outboundInfoChannel,
psPeerStateActions = peerStateActions } $
\(ledgerPeersThread, localRootPeersProvider) peerSelectionActions->
Async.withAsync
(peerSelectionGovernor' dtDebugPeerSelectionInitiatorResponderTracer debugStateVar peerSelectionActions) $ \governorThread ->
-- begin, unique to InitiatorAndResponder mode:
withSockets' $ \sockets addresses -> do
traceWith tracer (RunServer addresses)
Async.withAsync
(serverRun' sockets connectionManager inboundInfoChannel observableStateVar) $ \serverThread ->
-- end, unique to ...
Async.withAsync peerChurnGovernor' $ \churnGovernorThread ->
-- wait for any thread to fail:
snd <$> Async.waitAny [ledgerPeersThread, localRootPeersProvider, governorThread, churnGovernorThread, serverThread]

-- | Main entry point for data diffusion service. It allows to:
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

Expand All @@ -29,6 +28,7 @@ module Ouroboros.Network.PeerSelection.LedgerPeers
, accBigPoolStake
, bigLedgerPeerQuota
-- * DNS based provider for ledger root peers
, WithLedgerPeersArgs (..)
, withLedgerPeers
-- Re-exports for testing purposes
, module Ouroboros.Network.PeerSelection.LedgerPeers.Type
Expand Down Expand Up @@ -65,8 +65,8 @@ import Ouroboros.Network.PeerSelection.LedgerPeers.Common
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
import Ouroboros.Network.PeerSelection.RelayAccessPoint
import Ouroboros.Network.PeerSelection.RelayAccessPoint qualified as Socket
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSSemaphore
import Ouroboros.Network.PeerSelection.RootPeersDNS.LedgerPeers
(resolveLedgerPeers)

Expand Down Expand Up @@ -394,6 +394,18 @@ ledgerPeersThread inRng dnsSemaphore toPeerAddr tracer readUseLedgerAfter
addrs' = Set.insert addr addrs
in (addrs', domains)

-- | Argument record for withLedgerPeers
--
data WithLedgerPeersArgs m = WithLedgerPeersArgs {
wlpRng :: StdGen,
-- ^ Random generator for picking ledger peers
wlpConsensusInterface :: LedgerPeersConsensusInterface m,
wlpTracer :: Tracer m TraceLedgerPeers,
-- ^ Get Ledger Peers comes from here
wlpGetUseLedgerPeers :: STM m UseLedgerPeers
-- ^ Get Use Ledger After value
}

-- | For a LedgerPeers worker thread and submit request and receive responses.
--
withLedgerPeers :: forall peerAddr resolver exception m a.
Expand All @@ -403,18 +415,15 @@ withLedgerPeers :: forall peerAddr resolver exception m a.
, Exception exception
, Ord peerAddr
)
=> StdGen
-> DNSSemaphore m
-> (IP.IP -> Socket.PortNumber -> peerAddr)
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerPeers
-> LedgerPeersConsensusInterface m
-> DNSActions resolver exception m
-> ( (NumberOfPeers -> LedgerPeersKind -> m (Maybe (Set peerAddr, DiffTime)))
=> PeerActionsDNS peerAddr resolver exception m
-> WithLedgerPeersArgs m
-> ((NumberOfPeers -> LedgerPeersKind -> m (Maybe (Set peerAddr, DiffTime)))
-> Async m Void
-> m a )
-> m a
withLedgerPeers inRng dnsSemaphore toPeerAddr tracer readUseLedgerPeers interface dnsActions k = do
withLedgerPeers PeerActionsDNS { paToPeerAddr = toPeerAddr, paDnsActions = dnsActions, paDnsSemaphore = dnsSemaphore }
WithLedgerPeersArgs { wlpRng = inRng, wlpConsensusInterface = interface, wlpTracer = tracer, wlpGetUseLedgerPeers = readUseLedgerPeers }
k = do
reqVar <- newEmptyTMVarIO
respVar <- newEmptyTMVarIO
let getRequest = takeTMVar reqVar
Expand Down
Loading
Loading