Skip to content

Commit

Permalink
Added Light Peer Sharing
Browse files Browse the repository at this point in the history
Added a Control Channel between the Inbound Governor and the
OutboundGovernor. Reading from this channel is abstracted as a
PeerSelectionAction. When the InboundGovernor receives an Inbound
NewConnection it will write the address and PeerSharing Willingness
information to the Channel.
  • Loading branch information
bolt12 committed Jan 12, 2023
1 parent 4578ccb commit e241bce
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 51 deletions.
5 changes: 5 additions & 0 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ import qualified Ouroboros.Network.InboundGovernor.ControlChannel as Server
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing.Type
(PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec
(timeLimitsHandshake)
Expand Down Expand Up @@ -207,6 +209,7 @@ withBidirectionalConnectionManager snocket socket
k = do
mainThreadId <- myThreadId
inbgovControlChannel <- Server.newControlChannel
outgovControlChannel <- Server.newControlChannel
-- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to
-- pass list of requests, but since we are also interested in the results we
-- need to have multable cells to pass the accumulators around.
Expand Down Expand Up @@ -272,6 +275,8 @@ withBidirectionalConnectionManager snocket socket
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverControlChannel = inbgovControlChannel,
governorControlChannel = outgovControlChannel,
getPeerSharing = \_ -> NoPeerSharing,
serverObservableStateVar = observableStateVar
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.ConnectionManager.Types hiding
(TrUnexpectedlyFalseAssertion)
import Ouroboros.Network.InboundGovernor.ControlChannel
(ServerControlChannel)
(GovernorControlChannel, ServerControlChannel)
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel
import Ouroboros.Network.InboundGovernor.Event
import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.PeerSelection.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Server.RateLimiting


Expand Down Expand Up @@ -99,13 +100,16 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket peerAddr versionData versi
=> Tracer m (RemoteTransitionTrace peerAddr)
-> Tracer m (InboundGovernorTrace peerAddr)
-> ServerControlChannel muxMode peerAddr versionData ByteString m a b
-> GovernorControlChannel peerAddr m
-> (versionData -> PeerSharing)
-> Maybe DiffTime -- protocol idle timeout
-> MuxConnectionManager muxMode socket peerAddr versionData
versionNumber ByteString m a b
-> StrictTVar m InboundGovernorObservableState
-> m Void
inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
connectionManager observableStateVar = do
inboundGovernor trTracer tracer serverControlChannel governorControlChannel
getPeerSharing inboundIdleTimeout connectionManager
observableStateVar = do
-- State needs to be a TVar, otherwise, when catching the exception inside
-- the loop we do not have access to the most recent version of the state
-- and might be truncating transitions.
Expand Down Expand Up @@ -176,10 +180,18 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
provenance
connId
csDataFlow
(Handle csMux muxBundle _ _)) -> do
(Handle csMux muxBundle _ versionData)) -> do

traceWith tracer (TrNewConnection provenance connId)

-- Comunicate this new inbound connection to the Outbound Governor
when (provenance == Inbound) $
atomically $
ControlChannel.writeMessage governorControlChannel
( localAddress connId
, getPeerSharing versionData
)

igsConnections <- Map.alterF
(\case
-- connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import Network.TypedProtocol.Codec

import Ouroboros.Network.CodecCBORTerm
import Ouroboros.Network.ConnectionManager.Types (DataFlow (..))
import Ouroboros.Network.PeerSelection.PeerSharing.Type
(PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake.Codec
import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version
Expand Down Expand Up @@ -70,24 +72,32 @@ unversionedProtocol = simpleSingletonVersions UnversionedProtocol UnversionedPro

-- | Alternative for 'UnversionedProtocolData' which contains 'DataFlow'.
--
newtype DataFlowProtocolData =
DataFlowProtocolData { getProtocolDataFlow :: DataFlow }
data DataFlowProtocolData =
DataFlowProtocolData { getProtocolDataFlow :: DataFlow, getProtocolPeerSharing :: PeerSharing }
deriving (Eq, Show)

instance Acceptable DataFlowProtocolData where
acceptableVersion (DataFlowProtocolData local) (DataFlowProtocolData remote) =
Accept (DataFlowProtocolData $ local `min` remote)
acceptableVersion (DataFlowProtocolData local _) (DataFlowProtocolData remote ps) =
Accept (DataFlowProtocolData (local `min` remote) ps)

dataFlowProtocolDataCodec :: UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData
dataFlowProtocolDataCodec _ = CodecCBORTerm {encodeTerm, decodeTerm}
where
encodeTerm :: DataFlowProtocolData -> CBOR.Term
encodeTerm (DataFlowProtocolData Unidirectional) = CBOR.TBool False
encodeTerm (DataFlowProtocolData Duplex) = CBOR.TBool True
encodeTerm (DataFlowProtocolData Unidirectional NoPeerSharing) = CBOR.TList [CBOR.TBool False, CBOR.TInt 0]
encodeTerm (DataFlowProtocolData Unidirectional PeerSharingPrivate) = CBOR.TList [CBOR.TBool False, CBOR.TInt 1]
encodeTerm (DataFlowProtocolData Unidirectional PeerSharingPublic) = CBOR.TList [CBOR.TBool False, CBOR.TInt 2]
encodeTerm (DataFlowProtocolData Duplex NoPeerSharing) = CBOR.TList [CBOR.TBool True, CBOR.TInt 0]
encodeTerm (DataFlowProtocolData Duplex PeerSharingPrivate) = CBOR.TList [CBOR.TBool True, CBOR.TInt 1]
encodeTerm (DataFlowProtocolData Duplex PeerSharingPublic) = CBOR.TList [CBOR.TBool True, CBOR.TInt 2]

decodeTerm :: CBOR.Term -> Either Text DataFlowProtocolData
decodeTerm (CBOR.TBool False) = Right (DataFlowProtocolData Unidirectional)
decodeTerm (CBOR.TBool True) = Right (DataFlowProtocolData Duplex)
decodeTerm (CBOR.TList [CBOR.TBool False, CBOR.TInt 0]) = Right (DataFlowProtocolData Unidirectional NoPeerSharing)
decodeTerm (CBOR.TList [CBOR.TBool False, CBOR.TInt 1]) = Right (DataFlowProtocolData Unidirectional PeerSharingPrivate)
decodeTerm (CBOR.TList [CBOR.TBool False, CBOR.TInt 2]) = Right (DataFlowProtocolData Unidirectional PeerSharingPublic)
decodeTerm (CBOR.TList [CBOR.TBool True, CBOR.TInt 0]) = Right (DataFlowProtocolData Duplex NoPeerSharing)
decodeTerm (CBOR.TList [CBOR.TBool True, CBOR.TInt 1]) = Right (DataFlowProtocolData Duplex PeerSharingPrivate)
decodeTerm (CBOR.TList [CBOR.TBool True, CBOR.TInt 2]) = Right (DataFlowProtocolData Duplex PeerSharingPublic)
decodeTerm t = Left $ T.pack $ "unexpected term: " ++ show t

dataFlowProtocol :: DataFlow
Expand All @@ -96,7 +106,7 @@ dataFlowProtocol :: DataFlow
DataFlowProtocolData
app
dataFlowProtocol dataFlow =
simpleSingletonVersions UnversionedProtocol (DataFlowProtocolData dataFlow)
simpleSingletonVersions UnversionedProtocol (DataFlowProtocolData dataFlow NoPeerSharing)

-- | 'Handshake' codec used in various tests.
--
Expand Down
15 changes: 15 additions & 0 deletions ouroboros-network-framework/src/Ouroboros/Network/Server2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import Ouroboros.Network.InboundGovernor
import Ouroboros.Network.InboundGovernor.ControlChannel
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel
import Ouroboros.Network.Mux
import Ouroboros.Network.PeerSelection.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Server.RateLimiting
import Ouroboros.Network.Snocket

Expand Down Expand Up @@ -90,6 +91,16 @@ data ServerArguments (muxMode :: MuxMode) socket peerAddr versionData versionNu
serverControlChannel :: ServerControlChannel muxMode peerAddr versionData
bytes m a b,

-- | Governor control channel var is passed as an argument; this allows the Server
-- to communicate with the Outbound Governor telling it about new inbound
-- connections to add to the Known Peers Set.
--
governorControlChannel :: GovernorControlChannel peerAddr m,

-- | Extract 'PeerSharing' value from 'versionData'
--
getPeerSharing :: versionData -> PeerSharing,

-- | Observable mutable state.
--
serverObservableStateVar :: StrictTVar m InboundGovernorObservableState
Expand Down Expand Up @@ -141,6 +152,8 @@ run ServerArguments {
serverInboundIdleTimeout,
serverConnectionManager,
serverControlChannel,
governorControlChannel,
getPeerSharing,
serverObservableStateVar
} = do
let sockets = NonEmpty.toList serverSockets
Expand All @@ -156,6 +169,8 @@ run ServerArguments {
inboundGovernor serverTrTracer
inboundGovernorTracer
serverControlChannel
governorControlChannel
getPeerSharing
serverInboundIdleTimeout
serverConnectionManager
serverObservableStateVar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer snocket local
cmAddressType = \_ -> Just IPv4Address,
cmSnocket = snocket,
cmConfigureSocket = \_ _ -> return (),
connectionDataFlow = \_ (DataFlowProtocolData df) -> df,
connectionDataFlow = \_ (DataFlowProtocolData df _) -> df,
cmPrunePolicy = simplePrunePolicy,
cmConnectionsLimits = acceptedConnLimit,
cmTimeWaitTimeout = tTimeWaitTimeout timeouts,
Expand Down Expand Up @@ -515,6 +515,7 @@ withBidirectionalConnectionManager name timeouts
acceptedConnLimit k = do
mainThreadId <- myThreadId
inbgovControlChannel <- Server.newControlChannel
outbgovControlChannel <- Server.newControlChannel
-- we are not using the randomness
observableStateVar <- Server.newObservableStateVarFromSeed 0
let muxTracer = WithName name `contramap` nullTracer -- mux tracer
Expand All @@ -535,7 +536,7 @@ withBidirectionalConnectionManager name timeouts
cmConfigureSocket = \sock _ -> confSock sock,
cmTimeWaitTimeout = tTimeWaitTimeout timeouts,
cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts,
connectionDataFlow = \_ (DataFlowProtocolData df) -> df,
connectionDataFlow = \_ (DataFlowProtocolData df _) -> df,
cmPrunePolicy = simplePrunePolicy,
cmConnectionsLimits = acceptedConnLimit
}
Expand Down Expand Up @@ -575,6 +576,8 @@ withBidirectionalConnectionManager name timeouts
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just (tProtocolIdleTimeout timeouts),
serverControlChannel = inbgovControlChannel,
governorControlChannel = outbgovControlChannel,
getPeerSharing = \(DataFlowProtocolData _ ps) -> ps,
serverObservableStateVar = observableStateVar
}
)
Expand Down
23 changes: 19 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ import Ouroboros.Network.Diffusion.Utils
import Ouroboros.Network.ExitPolicy
import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..),
RemoteTransitionTrace)
import Ouroboros.Network.InboundGovernor.ControlChannel
(ControlChannel (..), GovernorControlChannel)
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux hiding (MiniProtocol (..))
import Ouroboros.Network.MuxMode
Expand All @@ -101,7 +103,8 @@ import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.PeerSelection.LedgerPeers
(UseLedgerAfter (..), withLedgerPeers)
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics)
import Ouroboros.Network.PeerSelection.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.PeerSelection.PeerSharing.Type
(PeerSharing (..))
import Ouroboros.Network.PeerSelection.PeerStateActions
(PeerConnectionHandle, PeerSelectionActionsTrace (..),
PeerStateActionsArguments (..), pchPeerSharing,
Expand Down Expand Up @@ -340,6 +343,7 @@ data ConnectionManagerDataInMode peerAddr versionData m a (mode :: MuxMode) wher

CMDInInitiatorResponderMode
:: ServerControlChannel InitiatorResponderMode peerAddr versionData ByteString m a ()
-> GovernorControlChannel peerAddr m
-> StrictTVar m Server.InboundGovernorObservableState
-> ConnectionManagerDataInMode peerAddr versionData m a InitiatorResponderMode

Expand Down Expand Up @@ -710,6 +714,7 @@ runM Interfaces
Just $ withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr
$ \localSocket -> do
localControlChannel <- Server.newControlChannel
localGovControlChannel <- Server.newControlChannel
localServerStateVar <- Server.newObservableStateVar ntcInbgovRng

let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0
Expand Down Expand Up @@ -779,6 +784,11 @@ runM Interfaces
serverConnectionLimits = localConnectionLimits,
serverConnectionManager = localConnectionManager,
serverControlChannel = localControlChannel,
governorControlChannel = localGovControlChannel,
-- local thread does not start a Outbound Governor
-- so it doesn't matter what we put here.
-- 'NoPeerSharing' is set for all connections.
getPeerSharing = \_ -> NoPeerSharing,
serverObservableStateVar = localServerStateVar
}) Async.wait

Expand All @@ -805,6 +815,7 @@ runM Interfaces
HasInitiatorResponder <$>
(CMDInInitiatorResponderMode
<$> Server.newControlChannel
<*> Server.newControlChannel
<*> Server.newObservableStateVar ntnInbgovRng)

-- RNGs used for picking random peers from the ledger and for
Expand Down Expand Up @@ -921,6 +932,7 @@ runM Interfaces
daPeerSharing
(pchPeerSharing diNtnPeerSharing)
(readTVar (getPeerSharingRegistry daPeerSharingRegistry))
retry -- Will never receive inbound connections
peerStateActions
requestLedgerPeers
$ \localPeerSelectionActionsThread
Expand Down Expand Up @@ -965,7 +977,7 @@ runM Interfaces
-- Run peer selection and the server.
--
HasInitiatorResponder
(CMDInInitiatorResponderMode controlChannel observableStateVar) -> do
(CMDInInitiatorResponderMode serverControlChannel governorControlChannel observableStateVar) -> do
let connectionManagerArguments
:: NodeToNodeConnectionManagerArguments
InitiatorResponderMode
Expand Down Expand Up @@ -1024,7 +1036,7 @@ runM Interfaces
connectionManagerArguments
connectionHandler
classifyHandleError
(InResponderMode controlChannel)
(InResponderMode serverControlChannel)
$ \(connectionManager
:: NodeToNodeConnectionManager
InitiatorResponderMode ntnFd ntnAddr ntnVersionData ntnVersion m a ()
Expand Down Expand Up @@ -1066,6 +1078,7 @@ runM Interfaces
daPeerSharing
(pchPeerSharing diNtnPeerSharing)
(readTVar (getPeerSharingRegistry daPeerSharingRegistry))
(readMessage governorControlChannel)
peerStateActions
requestLedgerPeers
$ \localPeerRootProviderThread
Expand Down Expand Up @@ -1109,7 +1122,9 @@ runM Interfaces
serverConnectionLimits = daAcceptedConnectionsLimit,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just daProtocolIdleTimeout,
serverControlChannel = controlChannel,
serverControlChannel = serverControlChannel,
governorControlChannel = governorControlChannel,
getPeerSharing = diNtnPeerSharing,
serverObservableStateVar = observableStateVar
})
$ \serverThread ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,11 @@ peerSelectionGovernorLoop tracer
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
guardedDecisions blockedAt peerSharing st =
-- All the alternative potentially-blocking decisions.
Monitor.connections actions st
<> Monitor.jobs jobPool st
<> Monitor.targetPeers actions st
<> Monitor.localRoots actions policy st
Monitor.connections actions st
<> Monitor.jobs jobPool st
<> Monitor.targetPeers actions st
<> Monitor.localRoots actions policy st
<> Monitor.newInboundConnections actions st

-- All the alternative non-blocking internal decisions.
<> RootPeers.belowTarget actions blockedAt st
Expand Down
Loading

0 comments on commit e241bce

Please sign in to comment.