Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Light Peer Sharing #4277

Merged
merged 5 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionManager.Core
import Ouroboros.Network.ConnectionManager.InformationChannel
(newInformationChannel)
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.ControlMessage (ControlMessageSTM)
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as Server
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec
(timeLimitsHandshake)
Expand Down Expand Up @@ -212,7 +214,8 @@ withBidirectionalConnectionManager snocket makeBearer socket
}
k = do
mainThreadId <- myThreadId
inbgovControlChannel <- Server.newControlChannel
inbgovInfoChannel <- newInformationChannel
outgovInfoChannel <- newInformationChannel
-- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to
-- pass list of requests, but since we are also interested in the results we
-- need to have multable cells to pass the accumulators around.
Expand Down Expand Up @@ -244,7 +247,8 @@ withBidirectionalConnectionManager snocket makeBearer socket
acceptedConnectionsHardLimit = maxBound,
acceptedConnectionsSoftLimit = maxBound,
acceptedConnectionsDelay = 0
}
},
cmGetPeerSharing = \_ -> NoPeerSharing
}
(makeConnectionHandler
muxTracer
Expand All @@ -265,7 +269,8 @@ withBidirectionalConnectionManager snocket makeBearer socket
(mainThreadId, debugMuxErrorRethrowPolicy
<> debugIOErrorRethrowPolicy))
(\_ -> HandshakeFailure)
(InResponderMode inbgovControlChannel)
(InResponderMode inbgovInfoChannel)
(InResponderMode outgovInfoChannel)
$ \connectionManager -> do
serverAddr <- Snocket.getLocalAddr snocket socket
withAsync
Expand All @@ -279,7 +284,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverControlChannel = inbgovControlChannel,
serverInboundInfoChannel = inbgovInfoChannel,
serverObservableStateVar = observableStateVar
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ library
Ouroboros.Network.ConnectionHandler
Ouroboros.Network.ConnectionManager.Types
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.InformationChannel
Ouroboros.Network.InboundGovernor
Ouroboros.Network.InboundGovernor.Event
Ouroboros.Network.InboundGovernor.State
Ouroboros.Network.InboundGovernor.ControlChannel
Ouroboros.Network.RethrowPolicy
Ouroboros.Network.Server.ConnectionTable
Ouroboros.Network.Server.Socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ import Network.Mux.Trace (MuxTrace, WithMuxBearer (..))
import Network.Mux.Types (MuxMode)

import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionManager.InformationChannel
(InformationChannel)
import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel
import Ouroboros.Network.ConnectionManager.Types
import qualified Ouroboros.Network.ConnectionManager.Types as CM
import Ouroboros.Network.InboundGovernor.ControlChannel
(ControlChannel (..))
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel
import Ouroboros.Network.InboundGovernor.Event
(NewConnectionInfo (..))
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.Server.RateLimiting
(AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket
Expand Down Expand Up @@ -135,7 +138,10 @@ data ConnectionManagerArguments handlerTrace socket peerAddr handle handleError
-- | Prune policy
--
cmPrunePolicy :: PrunePolicy peerAddr (STM m),
cmConnectionsLimits :: AcceptedConnectionsLimit
cmConnectionsLimits :: AcceptedConnectionsLimit,

-- | How to extract PeerSharing information from versionData
cmGetPeerSharing :: versionData -> PeerSharing
}


Expand Down Expand Up @@ -551,9 +557,12 @@ withConnectionManager
-- ^ Callback which runs in a thread dedicated for a given connection.
-> (handleError -> HandleErrorType)
-- ^ classify 'handleError's
-> InResponderMode muxMode (ControlChannel peerAddr handle m)
-> InResponderMode muxMode (InformationChannel (NewConnectionInfo peerAddr handle) m)
-- ^ On outbound duplex connections we need to notify the server about
-- a new connection.
-> InResponderMode muxMode (InformationChannel (peerAddr, PeerSharing) m)
-- ^ On inbound duplex connections we need to notify the outbound governor about
-- a new connection.
-> (ConnectionManager muxMode socket peerAddr handle handleError m -> m a)
-- ^ Continuation which receives the 'ConnectionManager'. It must not leak
-- outside of scope of this callback. Once it returns all resources
Expand All @@ -573,13 +582,15 @@ withConnectionManager ConnectionManagerArguments {
cmOutboundIdleTimeout,
connectionDataFlow,
cmPrunePolicy,
cmConnectionsLimits
cmConnectionsLimits,
cmGetPeerSharing
}
ConnectionHandler {
connectionHandler
}
classifyHandleError
inboundGovernorControlChannel
inboundGovernorInfoChannel
outboundGovernorInfoChannel
k = do
((freshIdSupply, stateVar)
:: ( FreshIdSupply m
Expand Down Expand Up @@ -1165,12 +1176,20 @@ withConnectionManager ConnectionManagerArguments {
case mbTransition of
Nothing -> return $ Disconnected connId Nothing
Just {} -> do
case inboundGovernorControlChannel of
InResponderMode controlChannel ->
atomically $ ControlChannel.writeMessage
controlChannel
(ControlChannel.NewConnection Inbound connId dataFlow handle)
case inboundGovernorInfoChannel of
InResponderMode infoChannel ->
atomically $ InfoChannel.writeMessage
infoChannel
(NewConnectionInfo Inbound connId dataFlow handle)
NotInResponderMode -> return ()
case (dataFlow, outboundGovernorInfoChannel) of
(Duplex, InResponderMode infoChannel) ->
atomically $ InfoChannel.writeMessage
infoChannel
(peerAddr, cmGetPeerSharing versionData)

_ -> return ()

return $ Connected connId dataFlow handle

terminateInboundWithErrorOrQuery connId connVar connThread peerAddr stateVar mutableConnState handleErrorM = do
Expand Down Expand Up @@ -1754,11 +1773,11 @@ withConnectionManager ConnectionManagerArguments {
-- @
let connState' = OutboundDupState connId connThread handle Ticking
writeTVar connVar connState'
case inboundGovernorControlChannel of
InResponderMode controlChannel ->
ControlChannel.writeMessage
controlChannel
(ControlChannel.NewConnection Outbound connId dataFlow handle)
case inboundGovernorInfoChannel of
InResponderMode infoChannel ->
InfoChannel.writeMessage
infoChannel
(NewConnectionInfo Outbound connId dataFlow handle)
NotInResponderMode -> return ()
return (Just $ mkTransition connState connState')
TerminatedState _ ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RankNTypes #-}
module Ouroboros.Network.ConnectionManager.InformationChannel where

import Control.Concurrent.Class.MonadSTM.Strict

import Data.Functor (($>))
import GHC.Natural (Natural)
import Ouroboros.Network.ConnectionHandler (Handle)
import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo)
import Ouroboros.Network.Mux (MuxMode)
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)

-- | Information channel.
--
data InformationChannel a m =
InformationChannel {
-- | Read a single value from the channel.
--
readMessage :: STM m a,

-- | Write a value to the channel.
--
writeMessage :: a -> STM m ()
}

-- | A Server control channel which instantiates to 'NewConnection' and 'Handle'.
--
-- It allows to pass 'STM' transactions which will resolve to 'NewConnection'.
-- Server's monitoring thread is the consumer of these messages; there are two
-- producers: accept loop and connection handler for outbound connections.
--
type InboundGovernorInfoChannel (muxMode :: MuxMode) peerAddr versionData bytes m a b =
InformationChannel (NewConnectionInfo peerAddr (Handle muxMode peerAddr versionData bytes m a b)) m

-- | Control Channel between Server and Outbound Governor.
--
-- Control channel that is meant to share inbound connections with the Peer
-- Selection Governor. So the consumer is the Governor and Producer is the
-- Server.
--
type OutboundGovernorInfoChannel peerAddr m =
InformationChannel (peerAddr, PeerSharing) m


newInformationChannel :: forall a m.
MonadLabelledSTM m
=> m (InformationChannel a m)
newInformationChannel = do
channel <-
atomically $
newTBQueue cc_QUEUE_BOUND
>>= \q -> labelTBQueue q "server-cc" $> q
pure $ InformationChannel {
readMessage = readTBQueue channel,
writeMessage = writeTBQueue channel
}

-- | The 'InformationChannel's 'TBQueue' depth.
--
cc_QUEUE_BOUND :: Natural
cc_QUEUE_BOUND = 10
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,16 @@ import qualified Network.Mux as Mux
import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.ConnectionManager.InformationChannel
(InboundGovernorInfoChannel)
import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel
import Ouroboros.Network.ConnectionManager.Types hiding
(TrUnexpectedlyFalseAssertion)
import Ouroboros.Network.InboundGovernor.ControlChannel
(ServerControlChannel)
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel
import Ouroboros.Network.InboundGovernor.Event
import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.Server.RateLimiting



-- | Run the server, which consists of the following components:
--
-- * /inbound governor/, it corresponds to p2p-governor on outbound side
Expand Down Expand Up @@ -97,14 +95,15 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket peerAddr versionData versi
)
=> Tracer m (RemoteTransitionTrace peerAddr)
-> Tracer m (InboundGovernorTrace peerAddr)
-> ServerControlChannel muxMode peerAddr versionData ByteString m a b
-> InboundGovernorInfoChannel muxMode peerAddr versionData ByteString m a b
-> Maybe DiffTime -- protocol idle timeout
-> MuxConnectionManager muxMode socket peerAddr versionData
versionNumber ByteString m a b
-> StrictTVar m InboundGovernorObservableState
-> m Void
inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
connectionManager observableStateVar = do
inboundGovernor trTracer tracer inboundInfoChannel
inboundIdleTimeout connectionManager
observableStateVar = do
-- State needs to be a TVar, otherwise, when catching the exception inside
-- the loop we do not have access to the most recent version of the state
-- and might be truncating transitions.
Expand Down Expand Up @@ -165,13 +164,13 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
)
(igsConnections state)
<> FirstToFinish (
NewConnection <$> ControlChannel.readMessage serverControlChannel)
NewConnection <$> InfoChannel.readMessage inboundInfoChannel)
(mbConnId, state') <- case event of
NewConnection
-- new connection has been announced by either accept loop or
-- by connection manager (in which case the connection is in
-- 'DuplexState').
(ControlChannel.NewConnection
(NewConnectionInfo
provenance
connId
csDataFlow
Expand Down Expand Up @@ -501,7 +500,6 @@ runResponder mux
startStrategy
(runMuxPeer responder . fromChannel)


--
-- Trace
--
Expand Down

This file was deleted.

Loading