From 090a5c1c25cddc86141eb25f83484849ef640c7a Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Thu, 12 Jan 2023 17:50:02 +0000 Subject: [PATCH 1/5] Refactor ControlChannel So that we can use it for sharing information between InboundGovernor and Outbound Governor. --- .../demo/connection-manager.hs | 5 +- .../ouroboros-network-framework.cabal | 2 +- .../Network/ConnectionManager/Core.hs | 18 ++-- .../ConnectionManager/InformationChannel.hs | 49 ++++++++++ .../src/Ouroboros/Network/InboundGovernor.hs | 28 ++++-- .../Network/InboundGovernor/ControlChannel.hs | 95 ------------------- .../Network/InboundGovernor/Event.hs | 29 +++++- .../src/Ouroboros/Network/Server2.hs | 8 +- .../Ouroboros/Network/ConnectionManager.hs | 8 +- .../test/Test/Ouroboros/Network/Server2.hs | 5 +- .../src/Ouroboros/Network/Diffusion/P2P.hs | 14 +-- 11 files changed, 128 insertions(+), 133 deletions(-) create mode 100644 ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs delete mode 100644 ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/ControlChannel.hs diff --git a/ouroboros-network-framework/demo/connection-manager.hs b/ouroboros-network-framework/demo/connection-manager.hs index 917cfae820f..f7399dce63b 100644 --- a/ouroboros-network-framework/demo/connection-manager.hs +++ b/ouroboros-network-framework/demo/connection-manager.hs @@ -59,9 +59,10 @@ import Ouroboros.Network.Channel (fromChannel) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionId import Ouroboros.Network.ConnectionManager.Core +import Ouroboros.Network.ConnectionManager.InformationChannel + (newInformationChannel) import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.ControlMessage (ControlMessageSTM) -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as Server import Ouroboros.Network.IOManager import Ouroboros.Network.Mux import Ouroboros.Network.MuxMode @@ -212,7 +213,7 @@ withBidirectionalConnectionManager snocket makeBearer socket } k = do mainThreadId <- myThreadId - inbgovControlChannel <- Server.newControlChannel + inbgovControlChannel <- newInformationChannel -- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to -- pass list of requests, but since we are also interested in the results we -- need to have multable cells to pass the accumulators around. diff --git a/ouroboros-network-framework/ouroboros-network-framework.cabal b/ouroboros-network-framework/ouroboros-network-framework.cabal index ebeaf77a653..623ccc01c65 100644 --- a/ouroboros-network-framework/ouroboros-network-framework.cabal +++ b/ouroboros-network-framework/ouroboros-network-framework.cabal @@ -43,10 +43,10 @@ library Ouroboros.Network.ConnectionHandler Ouroboros.Network.ConnectionManager.Types Ouroboros.Network.ConnectionManager.Core + Ouroboros.Network.ConnectionManager.InformationChannel Ouroboros.Network.InboundGovernor Ouroboros.Network.InboundGovernor.Event Ouroboros.Network.InboundGovernor.State - Ouroboros.Network.InboundGovernor.ControlChannel Ouroboros.Network.RethrowPolicy Ouroboros.Network.Server.ConnectionTable Ouroboros.Network.Server.Socket diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs index 27829a70625..14955eca5bc 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs @@ -55,11 +55,13 @@ import Network.Mux.Trace (MuxTrace, WithMuxBearer (..)) import Network.Mux.Types (MuxMode) import Ouroboros.Network.ConnectionId +import Ouroboros.Network.ConnectionManager.InformationChannel + (InformationChannel) +import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel import Ouroboros.Network.ConnectionManager.Types import qualified Ouroboros.Network.ConnectionManager.Types as CM -import Ouroboros.Network.InboundGovernor.ControlChannel - (ControlChannel (..)) -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel +import Ouroboros.Network.InboundGovernor.Event + (NewConnectionInfo (..)) import Ouroboros.Network.MuxMode import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..)) @@ -551,7 +553,7 @@ withConnectionManager -- ^ Callback which runs in a thread dedicated for a given connection. -> (handleError -> HandleErrorType) -- ^ classify 'handleError's - -> InResponderMode muxMode (ControlChannel peerAddr handle m) + -> InResponderMode muxMode (InformationChannel (NewConnectionInfo peerAddr handle) m) -- ^ On outbound duplex connections we need to notify the server about -- a new connection. -> (ConnectionManager muxMode socket peerAddr handle handleError m -> m a) @@ -1167,9 +1169,9 @@ withConnectionManager ConnectionManagerArguments { Just {} -> do case inboundGovernorControlChannel of InResponderMode controlChannel -> - atomically $ ControlChannel.writeMessage + atomically $ InfoChannel.writeMessage controlChannel - (ControlChannel.NewConnection Inbound connId dataFlow handle) + (NewConnectionInfo Inbound connId dataFlow handle) NotInResponderMode -> return () return $ Connected connId dataFlow handle @@ -1756,9 +1758,9 @@ withConnectionManager ConnectionManagerArguments { writeTVar connVar connState' case inboundGovernorControlChannel of InResponderMode controlChannel -> - ControlChannel.writeMessage + InfoChannel.writeMessage controlChannel - (ControlChannel.NewConnection Outbound connId dataFlow handle) + (NewConnectionInfo Outbound connId dataFlow handle) NotInResponderMode -> return () return (Just $ mkTransition connState connState') TerminatedState _ -> diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs new file mode 100644 index 00000000000..3e3958518c6 --- /dev/null +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs @@ -0,0 +1,49 @@ +{-# LANGUAGE RankNTypes #-} +module Ouroboros.Network.ConnectionManager.InformationChannel where + +import Control.Concurrent.Class.MonadSTM.Strict + +import Data.Functor (($>)) +import GHC.Natural (Natural) +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) + +-- | Information channel. +-- +data InformationChannel a m = + InformationChannel { + -- | Read a single value from the channel. + -- + readMessage :: STM m a, + + -- | Write a value to the channel. + -- + writeMessage :: a -> STM m () + } + + +newInformationChannel :: forall a m. + MonadLabelledSTM m + => m (InformationChannel a m) +newInformationChannel = do + channel <- + atomically $ + newTBQueue cc_QUEUE_BOUND + >>= \q -> labelTBQueue q "server-cc" $> q + pure $ InformationChannel { + readMessage = readTBQueue channel, + writeMessage = writeTBQueue channel + } + +-- | The 'InformationChannel's 'TBQueue' depth. +-- +cc_QUEUE_BOUND :: Natural +cc_QUEUE_BOUND = 10 + +-- | Control Channel between Server and Outbound Governor. +-- +-- Control channel that is meant to share inbound connections with the Peer +-- Selection Governor. So the consumer is the Governor and Producer is the +-- Server. +-- +type OutboundGovernorInfoChannel peerAddr m = + InformationChannel (peerAddr, PeerSharing) m diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs index 0e31c0abd4e..9299a6c331e 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs @@ -21,6 +21,8 @@ module Ouroboros.Network.InboundGovernor , newObservableStateVarFromSeed -- * Run Inbound Protocol Governor , inboundGovernor + -- * Auxiliary Types + , InboundGovernorInfoChannel -- * Trace , InboundGovernorTrace (..) , RemoteSt (..) @@ -55,18 +57,16 @@ import qualified Network.Mux as Mux import Ouroboros.Network.Channel (fromChannel) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionId (ConnectionId (..)) +import Ouroboros.Network.ConnectionManager.InformationChannel + (InformationChannel) +import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel import Ouroboros.Network.ConnectionManager.Types hiding (TrUnexpectedlyFalseAssertion) -import Ouroboros.Network.InboundGovernor.ControlChannel - (ServerControlChannel) -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel import Ouroboros.Network.InboundGovernor.Event import Ouroboros.Network.InboundGovernor.State import Ouroboros.Network.Mux import Ouroboros.Network.Server.RateLimiting - - -- | Run the server, which consists of the following components: -- -- * /inbound governor/, it corresponds to p2p-governor on outbound side @@ -97,7 +97,7 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket peerAddr versionData versi ) => Tracer m (RemoteTransitionTrace peerAddr) -> Tracer m (InboundGovernorTrace peerAddr) - -> ServerControlChannel muxMode peerAddr versionData ByteString m a b + -> InboundGovernorInfoChannel muxMode peerAddr versionData ByteString m a b -> Maybe DiffTime -- protocol idle timeout -> MuxConnectionManager muxMode socket peerAddr versionData versionNumber ByteString m a b @@ -165,13 +165,13 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout ) (igsConnections state) <> FirstToFinish ( - NewConnection <$> ControlChannel.readMessage serverControlChannel) + NewConnection <$> InfoChannel.readMessage serverControlChannel) (mbConnId, state') <- case event of NewConnection -- new connection has been announced by either accept loop or -- by connection manager (in which case the connection is in -- 'DuplexState'). - (ControlChannel.NewConnection + (NewConnectionInfo provenance connId csDataFlow @@ -501,6 +501,18 @@ runResponder mux startStrategy (runMuxPeer responder . fromChannel) +-- +-- Auxiliary Types +-- + +-- | A Server control channel which instantiates to 'NewConnection' and 'Handle'. +-- +-- It allows to pass 'STM' transactions which will resolve to 'NewConnection'. +-- Server's monitoring thread is the consumer of these messages; there are two +-- producers: accept loop and connection handler for outbound connections. +-- +type InboundGovernorInfoChannel (muxMode :: MuxMode) peerAddr versionData bytes m a b = + InformationChannel (NewConnectionInfo peerAddr (Handle muxMode peerAddr versionData bytes m a b)) m -- -- Trace diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/ControlChannel.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/ControlChannel.hs deleted file mode 100644 index 2992e1e88c1..00000000000 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/ControlChannel.hs +++ /dev/null @@ -1,95 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE ScopedTypeVariables #-} - - --- | Intended to be imported qualified. --- -module Ouroboros.Network.InboundGovernor.ControlChannel - ( NewConnection (..) - , ControlChannel (..) - , ServerControlChannel - , newControlChannel - ) where - -import Control.Concurrent.Class.MonadSTM.Strict - -import Data.Functor (($>)) -import GHC.Natural (Natural) - -import Network.Mux.Types (MuxMode) - -import Ouroboros.Network.ConnectionHandler -import Ouroboros.Network.ConnectionId (ConnectionId (..)) -import Ouroboros.Network.ConnectionManager.Types - - --- | Announcement message for a new connection. --- -data NewConnection peerAddr handle - - -- | Announce a new connection. /Inbound protocol governor/ will start - -- responder protocols using 'StartOnDemand' strategy and monitor remote - -- transitions: @PromotedToWarm^{Duplex}_{Remote}@ and - -- @DemotedToCold^{dataFlow}_{Remote}@. - = NewConnection - !Provenance - !(ConnectionId peerAddr) - !DataFlow - !handle - -instance Show peerAddr - => Show (NewConnection peerAddr handle) where - show (NewConnection provenance connId dataFlow _) = - concat [ "NewConnection " - , show provenance - , " " - , show connId - , " " - , show dataFlow - ] - - - --- | A Server control channel which instantiates 'handle'. --- -type ServerControlChannel (muxMode :: MuxMode) peerAddr versionData bytes m a b = - ControlChannel peerAddr (Handle muxMode peerAddr versionData bytes m a b) m - --- | Control channel. It allows to pass 'STM' transactions which will --- resolve to 'NewConnection'. Server's monitoring thread is the consumer --- of these messages; there are two producers: accept loop and connection --- handler for outbound connections. --- -data ControlChannel peerAddr handle m = - ControlChannel { - -- | Read a single 'NewConnection' instruction from the channel. - -- - readMessage :: STM m (NewConnection peerAddr handle), - - -- | Write a 'NewConnection' to the channel. - -- - writeMessage :: NewConnection peerAddr handle -> STM m () - } - - -newControlChannel :: forall peerAddr handle m. - MonadLabelledSTM m - => m (ControlChannel peerAddr handle m) -newControlChannel = do - channel <- - atomically $ - newTBQueue cc_QUEUE_BOUND - >>= \q -> labelTBQueue q "server-cc" $> q - pure $ ControlChannel { - readMessage = readTBQueue channel, - writeMessage = writeTBQueue channel - } - - --- | The 'ControlChannel's 'TBQueue' depth. --- -cc_QUEUE_BOUND :: Natural -cc_QUEUE_BOUND = 10 diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs index 39bf09853a7..0befedc3fef 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs @@ -21,6 +21,7 @@ module Ouroboros.Network.InboundGovernor.Event , firstPeerDemotedToWarm , firstPeerDemotedToCold , firstPeerCommitRemote + , NewConnectionInfo (..) ) where import Control.Applicative (Alternative) @@ -41,18 +42,42 @@ import Network.Mux.Types (MiniProtocolDir (..), import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionId (ConnectionId (..)) import Ouroboros.Network.ConnectionManager.Types -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel import Ouroboros.Network.InboundGovernor.State import Ouroboros.Network.Mux +-- | Announcement message for a new connection. +-- +data NewConnectionInfo peerAddr handle + + -- | Announce a new connection. /Inbound protocol governor/ will start + -- responder protocols using 'StartOnDemand' strategy and monitor remote + -- transitions: @PromotedToWarm^{Duplex}_{Remote}@ and + -- @DemotedToCold^{dataFlow}_{Remote}@. + = NewConnectionInfo + !Provenance + !(ConnectionId peerAddr) + !DataFlow + !handle + +instance Show peerAddr + => Show (NewConnectionInfo peerAddr handle) where + show (NewConnectionInfo provenance connId dataFlow _) = + concat [ "NewConnectionInfo " + , show provenance + , " " + , show connId + , " " + , show dataFlow + ] + -- | Edge triggered events to which the /inbound protocol governor/ reacts. -- data Event (muxMode :: MuxMode) peerAddr versionData m a b -- | A request to start mini-protocol bundle, either from the server or from -- connection manager after a duplex connection was negotiated. -- - = NewConnection !(ControlChannel.NewConnection peerAddr + = NewConnection !(NewConnectionInfo peerAddr (Handle muxMode peerAddr versionData ByteString m a b)) -- | A multiplexer exited. diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs index abed17e188d..206832b7880 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs @@ -27,8 +27,6 @@ module Ouroboros.Network.Server2 , RemoteSt (..) , RemoteTransition , RemoteTransitionTrace - -- * ControlChannel - , module ControlChannel ) where import Control.Applicative (Alternative) @@ -53,8 +51,6 @@ import Foreign.C.Error import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.InboundGovernor -import Ouroboros.Network.InboundGovernor.ControlChannel -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel import Ouroboros.Network.Mux import Ouroboros.Network.Server.RateLimiting import Ouroboros.Network.Snocket @@ -88,8 +84,8 @@ data ServerArguments (muxMode :: MuxMode) socket peerAddr versionData versionNu -- server to run and manage responders which needs to be started on -- inbound connections. -- - serverControlChannel :: ServerControlChannel muxMode peerAddr versionData - bytes m a b, + serverControlChannel :: InboundGovernorInfoChannel muxMode peerAddr versionData + bytes m a b, -- | Observable mutable state. -- diff --git a/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs b/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs index e7d0d58bd18..450e498fdc5 100644 --- a/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs +++ b/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs @@ -59,12 +59,14 @@ import Test.Tasty.QuickCheck (testProperty) import Ouroboros.Network.ConnectionId (ConnectionId (..)) import Ouroboros.Network.ConnectionManager.Core import Ouroboros.Network.ConnectionManager.Types -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as ControlChannel import Ouroboros.Network.MuxMode import Ouroboros.Network.Server.RateLimiting import Ouroboros.Network.Snocket (Accept (..), Accepted (..), AddressFamily (TestFamily), Snocket (..), TestAddress (..)) +import Ouroboros.Network.ConnectionManager.InformationChannel + (newInformationChannel) +import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel import TestLib.ConnectionManager (verifyAbstractTransition) @@ -746,7 +748,7 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap = - Debug.traceShowM (t, msg)) --} - inbgovControlChannel <- ControlChannel.newControlChannel + inbgovControlChannel <- newInformationChannel let connectionHandler = mkConnectionHandler snocket result <- withConnectionManager ConnectionManagerArguments { @@ -932,7 +934,7 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap = -- run poor man's server which just reads the control channel, -- otherwise it would block if there are more than 10 connections. - forever (atomically (ControlChannel.readMessage inbgovControlChannel) $> ()) + forever (atomically (InfoChannel.readMessage inbgovControlChannel) $> ()) `race_` (do a <- accept snocket fd threads <- go [] a (schedule scheduleMap) diff --git a/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs index 30fb47947ec..b1813f82844 100644 --- a/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs @@ -88,7 +88,6 @@ import Ouroboros.Network.ControlMessage (ControlMessageSTM) import Ouroboros.Network.Driver.Limits import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..)) import qualified Ouroboros.Network.InboundGovernor as IG -import qualified Ouroboros.Network.InboundGovernor.ControlChannel as Server import Ouroboros.Network.InboundGovernor.State (InboundGovernorCounters (..)) import Ouroboros.Network.IOManager @@ -127,6 +126,8 @@ import Ouroboros.Network.Testing.Utils (WithName (..), WithTime (..), import Test.Ouroboros.Network.Orphans () import Test.Simulation.Network.Snocket hiding (tests) +import Ouroboros.Network.ConnectionManager.InformationChannel + (newInformationChannel) import TestLib.ConnectionManager (abstractStateIsFinalTransition, allValidTransitionsNames, validTransitionMap, verifyAbstractTransition, verifyAbstractTransitionOrder) @@ -526,7 +527,7 @@ withBidirectionalConnectionManager name timeouts handshakeTimeLimits acceptedConnLimit k = do mainThreadId <- myThreadId - inbgovControlChannel <- Server.newControlChannel + inbgovControlChannel <- newInformationChannel -- we are not using the randomness observableStateVar <- Server.newObservableStateVarFromSeed 0 let muxTracer = WithName name `contramap` nullTracer -- mux tracer diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index 68cb1dfcb80..0de4a39d284 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -80,13 +80,15 @@ import Ouroboros.Network.Socket (configureSocket, import Data.List (nub) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionManager.Core +import Ouroboros.Network.ConnectionManager.InformationChannel + (newInformationChannel) import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.Diffusion.Common hiding (nullTracers) import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies import Ouroboros.Network.Diffusion.Utils import Ouroboros.Network.ExitPolicy -import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..), - RemoteTransitionTrace) +import Ouroboros.Network.InboundGovernor (InboundGovernorInfoChannel, + InboundGovernorTrace (..), RemoteTransitionTrace) import Ouroboros.Network.IOManager import Ouroboros.Network.Mux hiding (MiniProtocol (..)) import Ouroboros.Network.MuxMode @@ -119,7 +121,7 @@ import Ouroboros.Network.PeerSharing (PeerSharingRegistry (..)) import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount) import Ouroboros.Network.RethrowPolicy import Ouroboros.Network.Server2 (ServerArguments (..), - ServerControlChannel, ServerTrace (..)) + ServerTrace (..)) import qualified Ouroboros.Network.Server2 as Server -- | P2P DiffusionTracers Extras @@ -346,7 +348,7 @@ data ConnectionManagerDataInMode peerAddr versionData m a (mode :: MuxMode) wher :: ConnectionManagerDataInMode peerAddr versionData m a InitiatorMode CMDInInitiatorResponderMode - :: ServerControlChannel InitiatorResponderMode peerAddr versionData ByteString m a () + :: InboundGovernorInfoChannel InitiatorResponderMode peerAddr versionData ByteString m a () -> StrictTVar m Server.InboundGovernorObservableState -> ConnectionManagerDataInMode peerAddr versionData m a InitiatorResponderMode @@ -730,7 +732,7 @@ runM Interfaces Just localAddr -> Just $ withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr $ \localSocket -> do - localControlChannel <- Server.newControlChannel + localControlChannel <- newInformationChannel localServerStateVar <- Server.newObservableStateVar ntcInbgovRng let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0 @@ -826,7 +828,7 @@ runM Interfaces InitiatorAndResponderDiffusionMode -> HasInitiatorResponder <$> (CMDInInitiatorResponderMode - <$> Server.newControlChannel + <$> newInformationChannel <*> Server.newObservableStateVar ntnInbgovRng) -- RNGs used for picking random peers from the ledger and for From 36ebebd410f7a4cadf530c0a0d91accf37da824e Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Thu, 12 Jan 2023 17:53:02 +0000 Subject: [PATCH 2/5] Added Light Peer Sharing Added a Control Channel between the Inbound Governor and the OutboundGovernor. Reading from this channel is abstracted as a PeerSelectionAction. When the InboundGovernor receives an Inbound NewConnection it will write the address and PeerSharing Willingness information to the Channel. --- .../demo/connection-manager.hs | 12 ++- .../Network/ConnectionManager/Core.hs | 35 +++++-- .../ConnectionManager/InformationChannel.hs | 10 -- .../src/Ouroboros/Network/InboundGovernor.hs | 18 +++- .../Network/Protocol/Handshake/Unversioned.hs | 32 ++++-- .../src/Ouroboros/Network/Server2.hs | 6 +- .../Ouroboros/Network/ConnectionManager.hs | 12 ++- .../test/Test/Ouroboros/Network/Server2.hs | 19 ++-- .../src/Ouroboros/Network/Diffusion/P2P.hs | 40 +++++--- .../Network/PeerSelection/Governor.hs | 10 ++ .../Network/PeerSelection/Governor/Monitor.hs | 28 ++++++ .../Network/PeerSelection/Governor/Types.hs | 3 + .../Ouroboros/Network/PeerSelection/Simple.hs | 4 + .../Test/Ouroboros/Network/PeerSelection.hs | 97 ++++++++++--------- .../Network/PeerSelection/MockEnvironment.hs | 11 ++- .../test/Test/Ouroboros/Network/Testnet.hs | 2 + 16 files changed, 225 insertions(+), 114 deletions(-) diff --git a/ouroboros-network-framework/demo/connection-manager.hs b/ouroboros-network-framework/demo/connection-manager.hs index f7399dce63b..ae46ca0da0a 100644 --- a/ouroboros-network-framework/demo/connection-manager.hs +++ b/ouroboros-network-framework/demo/connection-manager.hs @@ -66,6 +66,7 @@ import Ouroboros.Network.ControlMessage (ControlMessageSTM) import Ouroboros.Network.IOManager import Ouroboros.Network.Mux import Ouroboros.Network.MuxMode +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..)) import Ouroboros.Network.Protocol.Handshake import Ouroboros.Network.Protocol.Handshake.Codec (timeLimitsHandshake) @@ -213,7 +214,8 @@ withBidirectionalConnectionManager snocket makeBearer socket } k = do mainThreadId <- myThreadId - inbgovControlChannel <- newInformationChannel + inbgovInfoChannel <- newInformationChannel + outgovInfoChannel <- newInformationChannel -- as in the 'withInitiatorOnlyConnectionManager' we use a `StrictTVar` to -- pass list of requests, but since we are also interested in the results we -- need to have multable cells to pass the accumulators around. @@ -245,7 +247,8 @@ withBidirectionalConnectionManager snocket makeBearer socket acceptedConnectionsHardLimit = maxBound, acceptedConnectionsSoftLimit = maxBound, acceptedConnectionsDelay = 0 - } + }, + cmGetPeerSharing = \_ -> NoPeerSharing } (makeConnectionHandler muxTracer @@ -266,7 +269,8 @@ withBidirectionalConnectionManager snocket makeBearer socket (mainThreadId, debugMuxErrorRethrowPolicy <> debugIOErrorRethrowPolicy)) (\_ -> HandshakeFailure) - (InResponderMode inbgovControlChannel) + (InResponderMode inbgovInfoChannel) + (InResponderMode outgovInfoChannel) $ \connectionManager -> do serverAddr <- Snocket.getLocalAddr snocket socket withAsync @@ -280,7 +284,7 @@ withBidirectionalConnectionManager snocket makeBearer socket serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0, serverConnectionManager = connectionManager, serverInboundIdleTimeout = Just protocolIdleTimeout, - serverControlChannel = inbgovControlChannel, + serverInboundInfoChannel = inbgovInfoChannel, serverObservableStateVar = observableStateVar } ) diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs index 14955eca5bc..b2b4b2d0a44 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/Core.hs @@ -63,6 +63,7 @@ import qualified Ouroboros.Network.ConnectionManager.Types as CM import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo (..)) import Ouroboros.Network.MuxMode +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..)) import Ouroboros.Network.Snocket @@ -137,7 +138,10 @@ data ConnectionManagerArguments handlerTrace socket peerAddr handle handleError -- | Prune policy -- cmPrunePolicy :: PrunePolicy peerAddr (STM m), - cmConnectionsLimits :: AcceptedConnectionsLimit + cmConnectionsLimits :: AcceptedConnectionsLimit, + + -- | How to extract PeerSharing information from versionData + cmGetPeerSharing :: versionData -> PeerSharing } @@ -556,6 +560,9 @@ withConnectionManager -> InResponderMode muxMode (InformationChannel (NewConnectionInfo peerAddr handle) m) -- ^ On outbound duplex connections we need to notify the server about -- a new connection. + -> InResponderMode muxMode (InformationChannel (peerAddr, PeerSharing) m) + -- ^ On inbound duplex connections we need to notify the outbound governor about + -- a new connection. -> (ConnectionManager muxMode socket peerAddr handle handleError m -> m a) -- ^ Continuation which receives the 'ConnectionManager'. It must not leak -- outside of scope of this callback. Once it returns all resources @@ -575,13 +582,15 @@ withConnectionManager ConnectionManagerArguments { cmOutboundIdleTimeout, connectionDataFlow, cmPrunePolicy, - cmConnectionsLimits + cmConnectionsLimits, + cmGetPeerSharing } ConnectionHandler { connectionHandler } classifyHandleError - inboundGovernorControlChannel + inboundGovernorInfoChannel + outboundGovernorInfoChannel k = do ((freshIdSupply, stateVar) :: ( FreshIdSupply m @@ -1167,12 +1176,20 @@ withConnectionManager ConnectionManagerArguments { case mbTransition of Nothing -> return $ Disconnected connId Nothing Just {} -> do - case inboundGovernorControlChannel of - InResponderMode controlChannel -> + case inboundGovernorInfoChannel of + InResponderMode infoChannel -> atomically $ InfoChannel.writeMessage - controlChannel + infoChannel (NewConnectionInfo Inbound connId dataFlow handle) NotInResponderMode -> return () + case (dataFlow, outboundGovernorInfoChannel) of + (Duplex, InResponderMode infoChannel) -> + atomically $ InfoChannel.writeMessage + infoChannel + (peerAddr, cmGetPeerSharing versionData) + + _ -> return () + return $ Connected connId dataFlow handle terminateInboundWithErrorOrQuery connId connVar connThread peerAddr stateVar mutableConnState handleErrorM = do @@ -1756,10 +1773,10 @@ withConnectionManager ConnectionManagerArguments { -- @ let connState' = OutboundDupState connId connThread handle Ticking writeTVar connVar connState' - case inboundGovernorControlChannel of - InResponderMode controlChannel -> + case inboundGovernorInfoChannel of + InResponderMode infoChannel -> InfoChannel.writeMessage - controlChannel + infoChannel (NewConnectionInfo Outbound connId dataFlow handle) NotInResponderMode -> return () return (Just $ mkTransition connState connState') diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs index 3e3958518c6..f54d2daef76 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs @@ -5,7 +5,6 @@ import Control.Concurrent.Class.MonadSTM.Strict import Data.Functor (($>)) import GHC.Natural (Natural) -import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) -- | Information channel. -- @@ -38,12 +37,3 @@ newInformationChannel = do -- cc_QUEUE_BOUND :: Natural cc_QUEUE_BOUND = 10 - --- | Control Channel between Server and Outbound Governor. --- --- Control channel that is meant to share inbound connections with the Peer --- Selection Governor. So the consumer is the Governor and Producer is the --- Server. --- -type OutboundGovernorInfoChannel peerAddr m = - InformationChannel (peerAddr, PeerSharing) m diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs index 9299a6c331e..58594230107 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs @@ -23,6 +23,7 @@ module Ouroboros.Network.InboundGovernor , inboundGovernor -- * Auxiliary Types , InboundGovernorInfoChannel + , OutboundGovernorInfoChannel -- * Trace , InboundGovernorTrace (..) , RemoteSt (..) @@ -65,6 +66,7 @@ import Ouroboros.Network.ConnectionManager.Types hiding import Ouroboros.Network.InboundGovernor.Event import Ouroboros.Network.InboundGovernor.State import Ouroboros.Network.Mux +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) import Ouroboros.Network.Server.RateLimiting -- | Run the server, which consists of the following components: @@ -103,8 +105,9 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket peerAddr versionData versi versionNumber ByteString m a b -> StrictTVar m InboundGovernorObservableState -> m Void -inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout - connectionManager observableStateVar = do +inboundGovernor trTracer tracer inboundInfoChannel + inboundIdleTimeout connectionManager + observableStateVar = do -- State needs to be a TVar, otherwise, when catching the exception inside -- the loop we do not have access to the most recent version of the state -- and might be truncating transitions. @@ -165,7 +168,7 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout ) (igsConnections state) <> FirstToFinish ( - NewConnection <$> InfoChannel.readMessage serverControlChannel) + NewConnection <$> InfoChannel.readMessage inboundInfoChannel) (mbConnId, state') <- case event of NewConnection -- new connection has been announced by either accept loop or @@ -514,6 +517,15 @@ runResponder mux type InboundGovernorInfoChannel (muxMode :: MuxMode) peerAddr versionData bytes m a b = InformationChannel (NewConnectionInfo peerAddr (Handle muxMode peerAddr versionData bytes m a b)) m +-- | Control Channel between Server and Outbound Governor. +-- +-- Control channel that is meant to share inbound connections with the Peer +-- Selection Governor. So the consumer is the Governor and Producer is the +-- Server. +-- +type OutboundGovernorInfoChannel peerAddr m = + InformationChannel (peerAddr, PeerSharing) m + -- -- Trace -- diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs b/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs index 223597d7c7b..ff8f399b5c5 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs @@ -26,6 +26,7 @@ import Network.TypedProtocol.Codec import Ouroboros.Network.CodecCBORTerm import Ouroboros.Network.ConnectionManager.Types (DataFlow (..)) +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..)) import Ouroboros.Network.Protocol.Handshake.Codec import Ouroboros.Network.Protocol.Handshake.Type import Ouroboros.Network.Protocol.Handshake.Version @@ -73,27 +74,38 @@ unversionedProtocol = simpleSingletonVersions UnversionedProtocol UnversionedPro -- | Alternative for 'UnversionedProtocolData' which contains 'DataFlow'. -- -newtype DataFlowProtocolData = - DataFlowProtocolData { getProtocolDataFlow :: DataFlow } +data DataFlowProtocolData = + DataFlowProtocolData { + getProtocolDataFlow :: DataFlow, + getProtocolPeerSharing :: PeerSharing + } deriving (Eq, Show) instance Acceptable DataFlowProtocolData where - acceptableVersion (DataFlowProtocolData local) (DataFlowProtocolData remote) = - Accept (DataFlowProtocolData $ local `min` remote) + acceptableVersion (DataFlowProtocolData local _) (DataFlowProtocolData remote ps) = + Accept (DataFlowProtocolData (local `min` remote) ps) instance Queryable DataFlowProtocolData where - queryVersion (DataFlowProtocolData _) = False + queryVersion (DataFlowProtocolData _ _) = False dataFlowProtocolDataCodec :: UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData dataFlowProtocolDataCodec _ = CodecCBORTerm {encodeTerm, decodeTerm} where encodeTerm :: DataFlowProtocolData -> CBOR.Term - encodeTerm (DataFlowProtocolData Unidirectional) = CBOR.TBool False - encodeTerm (DataFlowProtocolData Duplex) = CBOR.TBool True + encodeTerm (DataFlowProtocolData Unidirectional NoPeerSharing) = CBOR.TList [CBOR.TBool False, CBOR.TInt 0] + encodeTerm (DataFlowProtocolData Unidirectional PeerSharingPrivate) = CBOR.TList [CBOR.TBool False, CBOR.TInt 1] + encodeTerm (DataFlowProtocolData Unidirectional PeerSharingPublic) = CBOR.TList [CBOR.TBool False, CBOR.TInt 2] + encodeTerm (DataFlowProtocolData Duplex NoPeerSharing) = CBOR.TList [CBOR.TBool True, CBOR.TInt 0] + encodeTerm (DataFlowProtocolData Duplex PeerSharingPrivate) = CBOR.TList [CBOR.TBool True, CBOR.TInt 1] + encodeTerm (DataFlowProtocolData Duplex PeerSharingPublic) = CBOR.TList [CBOR.TBool True, CBOR.TInt 2] decodeTerm :: CBOR.Term -> Either Text DataFlowProtocolData - decodeTerm (CBOR.TBool False) = Right (DataFlowProtocolData Unidirectional) - decodeTerm (CBOR.TBool True) = Right (DataFlowProtocolData Duplex) + decodeTerm (CBOR.TList [CBOR.TBool False, CBOR.TInt 0]) = Right (DataFlowProtocolData Unidirectional NoPeerSharing) + decodeTerm (CBOR.TList [CBOR.TBool False, CBOR.TInt 1]) = Right (DataFlowProtocolData Unidirectional PeerSharingPrivate) + decodeTerm (CBOR.TList [CBOR.TBool False, CBOR.TInt 2]) = Right (DataFlowProtocolData Unidirectional PeerSharingPublic) + decodeTerm (CBOR.TList [CBOR.TBool True, CBOR.TInt 0]) = Right (DataFlowProtocolData Duplex NoPeerSharing) + decodeTerm (CBOR.TList [CBOR.TBool True, CBOR.TInt 1]) = Right (DataFlowProtocolData Duplex PeerSharingPrivate) + decodeTerm (CBOR.TList [CBOR.TBool True, CBOR.TInt 2]) = Right (DataFlowProtocolData Duplex PeerSharingPublic) decodeTerm t = Left $ T.pack $ "unexpected term: " ++ show t dataFlowProtocol :: DataFlow @@ -102,7 +114,7 @@ dataFlowProtocol :: DataFlow DataFlowProtocolData app dataFlowProtocol dataFlow = - simpleSingletonVersions UnversionedProtocol (DataFlowProtocolData dataFlow) + simpleSingletonVersions UnversionedProtocol (DataFlowProtocolData dataFlow NoPeerSharing) -- | 'Handshake' codec used in various tests. -- diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs index 206832b7880..ee24de0cacb 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs @@ -84,7 +84,7 @@ data ServerArguments (muxMode :: MuxMode) socket peerAddr versionData versionNu -- server to run and manage responders which needs to be started on -- inbound connections. -- - serverControlChannel :: InboundGovernorInfoChannel muxMode peerAddr versionData + serverInboundInfoChannel :: InboundGovernorInfoChannel muxMode peerAddr versionData bytes m a b, -- | Observable mutable state. @@ -139,7 +139,7 @@ run ServerArguments { serverLimits@AcceptedConnectionsLimit { acceptedConnectionsHardLimit = hardLimit }, serverInboundIdleTimeout, serverConnectionManager, - serverControlChannel, + serverInboundInfoChannel, serverObservableStateVar } = do let sockets = NonEmpty.toList serverSockets @@ -154,7 +154,7 @@ run ServerArguments { ) inboundGovernor serverTrTracer inboundGovernorTracer - serverControlChannel + serverInboundInfoChannel serverInboundIdleTimeout serverConnectionManager serverObservableStateVar) diff --git a/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs b/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs index 450e498fdc5..d3fda0d6789 100644 --- a/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs +++ b/ouroboros-network-framework/test/Test/Ouroboros/Network/ConnectionManager.hs @@ -67,6 +67,7 @@ import Ouroboros.Network.Snocket (Accept (..), Accepted (..), import Ouroboros.Network.ConnectionManager.InformationChannel (newInformationChannel) import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..)) import TestLib.ConnectionManager (verifyAbstractTransition) @@ -748,7 +749,8 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap = - Debug.traceShowM (t, msg)) --} - inbgovControlChannel <- newInformationChannel + inbgovInfoChannel <- newInformationChannel + outgovInfoChannel <- newInformationChannel let connectionHandler = mkConnectionHandler snocket result <- withConnectionManager ConnectionManagerArguments { @@ -769,11 +771,13 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap = acceptedConnectionsDelay = 0 }, cmTimeWaitTimeout = testTimeWaitTimeout, - cmOutboundIdleTimeout = testOutboundIdleTimeout + cmOutboundIdleTimeout = testOutboundIdleTimeout, + cmGetPeerSharing = \_ -> NoPeerSharing } connectionHandler (\_ -> HandshakeFailure) - (InResponderMode inbgovControlChannel) + (InResponderMode inbgovInfoChannel) + (InResponderMode outgovInfoChannel) $ \(connectionManager :: ConnectionManager InitiatorResponderMode (FD (IOSim s)) Addr (Handle m) Void (IOSim s)) -> do @@ -934,7 +938,7 @@ prop_valid_transitions (SkewedBool bindToLocalAddress) scheduleMap = -- run poor man's server which just reads the control channel, -- otherwise it would block if there are more than 10 connections. - forever (atomically (InfoChannel.readMessage inbgovControlChannel) $> ()) + forever (atomically (InfoChannel.readMessage inbgovInfoChannel) $> ()) `race_` (do a <- accept snocket fd threads <- go [] a (schedule scheduleMap) diff --git a/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs index b1813f82844..04d04618553 100644 --- a/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs @@ -366,11 +366,12 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer snocket makeB cmSnocket = snocket, cmMakeBearer = makeBearer, cmConfigureSocket = \_ _ -> return (), - connectionDataFlow = \_ (DataFlowProtocolData df) -> df, + connectionDataFlow = \_ (DataFlowProtocolData df _) -> df, cmPrunePolicy = simplePrunePolicy, cmConnectionsLimits = acceptedConnLimit, cmTimeWaitTimeout = tTimeWaitTimeout timeouts, - cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts + cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts, + cmGetPeerSharing = \(DataFlowProtocolData _ ps) -> ps } (makeConnectionHandler muxTracer @@ -391,6 +392,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer snocket makeB <> assertRethrowPolicy)) (\_ -> HandshakeFailure) NotInResponderMode + NotInResponderMode (\cm -> k cm `catch` \(e :: SomeException) -> throwIO e) where @@ -527,7 +529,8 @@ withBidirectionalConnectionManager name timeouts handshakeTimeLimits acceptedConnLimit k = do mainThreadId <- myThreadId - inbgovControlChannel <- newInformationChannel + inbgovInfoChannel <- newInformationChannel + outgovInfoChannel <- newInformationChannel -- we are not using the randomness observableStateVar <- Server.newObservableStateVarFromSeed 0 let muxTracer = WithName name `contramap` nullTracer -- mux tracer @@ -549,9 +552,10 @@ withBidirectionalConnectionManager name timeouts cmConfigureSocket = \sock _ -> confSock sock, cmTimeWaitTimeout = tTimeWaitTimeout timeouts, cmOutboundIdleTimeout = tOutboundIdleTimeout timeouts, - connectionDataFlow = \_ (DataFlowProtocolData df) -> df, + connectionDataFlow = \_ (DataFlowProtocolData df _) -> df, cmPrunePolicy = simplePrunePolicy, - cmConnectionsLimits = acceptedConnLimit + cmConnectionsLimits = acceptedConnLimit, + cmGetPeerSharing = \(DataFlowProtocolData _ ps) -> ps } (makeConnectionHandler muxTracer @@ -571,7 +575,8 @@ withBidirectionalConnectionManager name timeouts <> debugIOErrorRethrowPolicy <> assertRethrowPolicy)) (\_ -> HandshakeFailure) - (InResponderMode inbgovControlChannel) + (InResponderMode inbgovInfoChannel) + (InResponderMode outgovInfoChannel) $ \connectionManager -> do serverAddr <- Snocket.getLocalAddr snocket socket @@ -589,7 +594,7 @@ withBidirectionalConnectionManager name timeouts serverConnectionLimits = acceptedConnLimit, serverConnectionManager = connectionManager, serverInboundIdleTimeout = Just (tProtocolIdleTimeout timeouts), - serverControlChannel = inbgovControlChannel, + serverInboundInfoChannel = inbgovInfoChannel, serverObservableStateVar = observableStateVar } ) diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index 0de4a39d284..77ac7d57a6e 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -81,14 +81,15 @@ import Data.List (nub) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionManager.Core import Ouroboros.Network.ConnectionManager.InformationChannel - (newInformationChannel) + (InformationChannel (..), newInformationChannel) import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.Diffusion.Common hiding (nullTracers) import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies import Ouroboros.Network.Diffusion.Utils import Ouroboros.Network.ExitPolicy import Ouroboros.Network.InboundGovernor (InboundGovernorInfoChannel, - InboundGovernorTrace (..), RemoteTransitionTrace) + InboundGovernorTrace (..), OutboundGovernorInfoChannel, + RemoteTransitionTrace) import Ouroboros.Network.IOManager import Ouroboros.Network.Mux hiding (MiniProtocol (..)) import Ouroboros.Network.MuxMode @@ -107,7 +108,7 @@ import Ouroboros.Network.PeerSelection.Governor.Types import Ouroboros.Network.PeerSelection.LedgerPeers (UseLedgerAfter (..), withLedgerPeers) import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics) -import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..)) import Ouroboros.Network.PeerSelection.PeerStateActions (PeerConnectionHandle, PeerSelectionActionsTrace (..), PeerStateActionsArguments (..), pchPeerSharing, @@ -349,6 +350,7 @@ data ConnectionManagerDataInMode peerAddr versionData m a (mode :: MuxMode) wher CMDInInitiatorResponderMode :: InboundGovernorInfoChannel InitiatorResponderMode peerAddr versionData ByteString m a () + -> OutboundGovernorInfoChannel peerAddr m -> StrictTVar m Server.InboundGovernorObservableState -> ConnectionManagerDataInMode peerAddr versionData m a InitiatorResponderMode @@ -732,7 +734,8 @@ runM Interfaces Just localAddr -> Just $ withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr $ \localSocket -> do - localControlChannel <- newInformationChannel + localInbInfoChannel <- newInformationChannel + localOutInfoChannel <- newInformationChannel localServerStateVar <- Server.newObservableStateVar ntcInbgovRng let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0 @@ -771,14 +774,20 @@ runM Interfaces connectionDataFlow = localDataFlow, cmPrunePolicy = Diffusion.Policies.prunePolicy localServerStateVar, - cmConnectionsLimits = localConnectionLimits + cmConnectionsLimits = localConnectionLimits, + + -- local thread does not start a Outbound Governor + -- so it doesn't matter what we put here. + -- 'NoPeerSharing' is set for all connections. + cmGetPeerSharing = \_ -> NoPeerSharing } withConnectionManager localConnectionManagerArguments localConnectionHandler classifyHandleError - (InResponderMode localControlChannel) + (InResponderMode localInbInfoChannel) + (InResponderMode localOutInfoChannel) $ \(localConnectionManager :: NodeToClientConnectionManager ntcFd ntcAddr ntcVersion ntcVersionData m) @@ -802,7 +811,7 @@ runM Interfaces serverInboundIdleTimeout = Nothing, serverConnectionLimits = localConnectionLimits, serverConnectionManager = localConnectionManager, - serverControlChannel = localControlChannel, + serverInboundInfoChannel = localInbInfoChannel, serverObservableStateVar = localServerStateVar }) Async.wait @@ -829,6 +838,7 @@ runM Interfaces HasInitiatorResponder <$> (CMDInInitiatorResponderMode <$> newInformationChannel + <*> newInformationChannel <*> Server.newObservableStateVar ntnInbgovRng) -- RNGs used for picking random peers from the ledger and for @@ -886,7 +896,8 @@ runM Interfaces -- than limits imposed by 'cmConnectionsLimits'. cmConnectionsLimits = daAcceptedConnectionsLimit, cmTimeWaitTimeout = daTimeWaitTimeout, - cmOutboundIdleTimeout = daProtocolIdleTimeout + cmOutboundIdleTimeout = daProtocolIdleTimeout, + cmGetPeerSharing = diNtnPeerSharing } connectionHandler @@ -907,6 +918,7 @@ runM Interfaces connectionHandler classifyHandleError NotInResponderMode + NotInResponderMode $ \(connectionManager :: NodeToNodeConnectionManager InitiatorMode ntnFd ntnAddr ntnVersionData ntnVersion m a Void) @@ -946,6 +958,7 @@ runM Interfaces daOwnPeerSharing (pchPeerSharing diNtnPeerSharing) (readTVar (getPeerSharingRegistry daPeerSharingRegistry)) + retry -- Will never receive inbound connections peerStateActions requestLedgerPeers $ \localPeerSelectionActionsThread @@ -990,7 +1003,7 @@ runM Interfaces -- Run peer selection and the server. -- HasInitiatorResponder - (CMDInInitiatorResponderMode controlChannel observableStateVar) -> do + (CMDInInitiatorResponderMode inboundInfoChannel outboundInfoChannel observableStateVar) -> do let connectionManagerArguments :: NodeToNodeConnectionManagerArguments InitiatorResponderMode @@ -1013,7 +1026,8 @@ runM Interfaces cmPrunePolicy = Diffusion.Policies.prunePolicy observableStateVar, cmConnectionsLimits = daAcceptedConnectionsLimit, cmTimeWaitTimeout = daTimeWaitTimeout, - cmOutboundIdleTimeout = daProtocolIdleTimeout + cmOutboundIdleTimeout = daProtocolIdleTimeout, + cmGetPeerSharing = diNtnPeerSharing } computePeerSharingPeers :: STM m (PublicPeerSelectionState ntnAddr) @@ -1051,7 +1065,8 @@ runM Interfaces connectionManagerArguments connectionHandler classifyHandleError - (InResponderMode controlChannel) + (InResponderMode inboundInfoChannel) + (InResponderMode outboundInfoChannel) $ \(connectionManager :: NodeToNodeConnectionManager InitiatorResponderMode ntnFd ntnAddr ntnVersionData ntnVersion m a () @@ -1093,6 +1108,7 @@ runM Interfaces daOwnPeerSharing (pchPeerSharing diNtnPeerSharing) (readTVar (getPeerSharingRegistry daPeerSharingRegistry)) + (readMessage outboundInfoChannel) peerStateActions requestLedgerPeers $ \localPeerRootProviderThread @@ -1136,7 +1152,7 @@ runM Interfaces serverConnectionLimits = daAcceptedConnectionsLimit, serverConnectionManager = connectionManager, serverInboundIdleTimeout = Just daProtocolIdleTimeout, - serverControlChannel = controlChannel, + serverInboundInfoChannel = inboundInfoChannel, serverObservableStateVar = observableStateVar }) $ \serverThread -> diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs index b94c7ac7c35..50e14fdf533 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs @@ -580,6 +580,16 @@ peerSelectionGovernorLoop tracer <> ActivePeers.belowTarget actions policy st <> ActivePeers.aboveTarget actions policy st + -- Note that this job is potentially blocking but is non-prioritary. + -- + -- The node could be bombarded with incoming connections and we don't want + -- to hinder it making progress towards the targets. + -- + -- Although we do have rate-limiting of inbound connections it is better + -- to safeguard it by giving it less priority at the governor level. + -- + <> Monitor.inboundPeers actions st + -- There is no rootPeersAboveTarget since the roots target is one sided. -- The changedTargets needs to come before the changedLocalRootPeers in diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs index 7bdecea0d24..22e21fe7b95 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs @@ -13,6 +13,7 @@ module Ouroboros.Network.PeerSelection.Governor.Monitor , jobs , connections , localRoots + , inboundPeers ) where import Data.Map.Strict (Map) @@ -38,6 +39,8 @@ import Ouroboros.Network.PeerSelection.Governor.Types hiding import qualified Ouroboros.Network.PeerSelection.KnownPeers as KnownPeers import Ouroboros.Network.PeerSelection.LedgerPeers (IsLedgerPeer (..)) import qualified Ouroboros.Network.PeerSelection.LocalRootPeers as LocalRootPeers +import Ouroboros.Network.PeerSelection.PeerAdvertise + (PeerAdvertise (..)) import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..)) import Ouroboros.Network.PeerSelection.Types @@ -91,6 +94,31 @@ jobs jobPool st = Completion completion <- JobPool.waitForJob jobPool return (completion st) +-- | Monitor new inbound connections +-- +inboundPeers :: forall m peeraddr peerconn. + (MonadSTM m, Ord peeraddr) + => PeerSelectionActions peeraddr peerconn m + -> PeerSelectionState peeraddr peerconn + -> Guarded (STM m) (TimedDecision m peeraddr peerconn) +inboundPeers PeerSelectionActions{ + readNewInboundConnection + } + st@PeerSelectionState { + knownPeers + } = + Guarded Nothing $ do + (addr, ps) <- readNewInboundConnection + return $ \_ -> + let -- If peer happens to already be present in the Known Peer set + -- 'insert' is going to do its due diligence before adding. + newEntry = Map.singleton addr (ps, DoAdvertisePeer, IsNotLedgerPeer) + knownPeers' = KnownPeers.insert newEntry knownPeers + in Decision { + decisionTrace = [TraceKnownInboundConnection addr ps], + decisionJobs = [], + decisionState = st { knownPeers = knownPeers' } + } -- | Monitor connections. -- diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs index 88b2ea1cc6f..96b5d1570db 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs @@ -204,6 +204,8 @@ data PeerSelectionActions peeraddr peerconn m = PeerSelectionActions { -- readLocalRootPeers :: STM m [(Int, Map peeraddr PeerAdvertise)], + readNewInboundConnection :: STM m (peeraddr, PeerSharing), + -- | Read the current Peer Sharing willingness value -- -- This value comes from the Node's configuration file. @@ -654,6 +656,7 @@ data TracePeerSelection peeraddr = | TracePeerShareRequests Int Int (Set peeraddr) (Set peeraddr) | TracePeerShareResults [(peeraddr, Either SomeException (PeerSharingResult peeraddr))] --TODO: classify failures | TracePeerShareResultsFiltered [peeraddr] + | TraceKnownInboundConnection peeraddr PeerSharing -- | target known peers, actual known peers, selected peer | TraceForgetColdPeers Int Int (Set peeraddr) -- | target established, actual established, selected peers diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs index a57059e5bea..d6a399d0aca 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs @@ -67,6 +67,8 @@ withPeerSelectionActions -- ^ Extract peer sharing information from peerconn -> STM m (Map peeraddr (PeerSharingController peeraddr m)) -- ^ peer sharing registry + -> STM m (peeraddr, PeerSharing) + -- ^ Read New Inbound Connections -> PeerStateActions peeraddr peerconn m -> (NumberOfPeers -> m (Maybe (Set peeraddr, DiffTime))) -> ( Async m Void @@ -86,6 +88,7 @@ withPeerSelectionActions peerSharing peerConnToPeerSharing readPeerSharingController + readNewInboundConnections peerStateActions getLedgerPeers k = do @@ -93,6 +96,7 @@ withPeerSelectionActions let peerSelectionActions = PeerSelectionActions { readPeerSelectionTargets = readTargets, readLocalRootPeers = readTVar localRootsVar, + readNewInboundConnection = readNewInboundConnections, peerSharing, peerConnToPeerSharing, requestPublicRootPeers = requestPublicRootPeers, diff --git a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs index cc1fbc58c81..bd822147f5e 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs @@ -42,9 +42,7 @@ import qualified Data.OrdPSQ as PSQ import System.Random (mkStdGen) import Control.Exception (AssertionFailed (..), catch, evaluate) -import Control.Monad.Class.MonadSTM (STM) -import Control.Monad.Class.MonadTime.SI -import Control.Monad.IOSim +import Control.Monad.Class.MonadSTM (STM, retry) import Control.Tracer (Tracer (..)) import qualified Network.DNS as DNS (defaultResolvConf) @@ -80,6 +78,8 @@ import Test.QuickCheck import Test.Tasty (DependencyType (..), TestTree, after, testGroup) import Test.Tasty.QuickCheck (testProperty) import Text.Pretty.Simple (pPrint) +import Control.Monad.Class.MonadTime.SI +import Control.Monad.IOSim -- Exactly as named. unfHydra :: Int @@ -570,28 +570,29 @@ traceNum TracePublicRootsFailure{} = 04 traceNum TracePeerShareRequests{} = 05 traceNum TracePeerShareResults{} = 06 traceNum TracePeerShareResultsFiltered{} = 07 -traceNum TraceForgetColdPeers{} = 08 -traceNum TracePromoteColdPeers{} = 09 -traceNum TracePromoteColdLocalPeers{} = 10 -traceNum TracePromoteColdFailed{} = 11 -traceNum TracePromoteColdDone{} = 12 -traceNum TracePromoteWarmPeers{} = 13 -traceNum TracePromoteWarmLocalPeers{} = 14 -traceNum TracePromoteWarmFailed{} = 15 -traceNum TracePromoteWarmDone{} = 16 -traceNum TraceDemoteWarmPeers{} = 17 -traceNum TraceDemoteWarmFailed{} = 18 -traceNum TraceDemoteWarmDone{} = 19 -traceNum TraceDemoteHotPeers{} = 20 -traceNum TraceDemoteLocalHotPeers{} = 21 -traceNum TraceDemoteHotFailed{} = 22 -traceNum TraceDemoteHotDone{} = 23 -traceNum TraceDemoteAsynchronous{} = 24 -traceNum TraceGovernorWakeup{} = 25 -traceNum TraceChurnWait{} = 26 -traceNum TraceChurnMode{} = 27 -traceNum TracePromoteWarmAborted{} = 28 -traceNum TraceDemoteLocalAsynchronous{} = 29 +traceNum TraceKnownInboundConnection{} = 08 +traceNum TraceForgetColdPeers{} = 09 +traceNum TracePromoteColdPeers{} = 10 +traceNum TracePromoteColdLocalPeers{} = 11 +traceNum TracePromoteColdFailed{} = 12 +traceNum TracePromoteColdDone{} = 13 +traceNum TracePromoteWarmPeers{} = 14 +traceNum TracePromoteWarmLocalPeers{} = 15 +traceNum TracePromoteWarmFailed{} = 16 +traceNum TracePromoteWarmDone{} = 17 +traceNum TraceDemoteWarmPeers{} = 18 +traceNum TraceDemoteWarmFailed{} = 19 +traceNum TraceDemoteWarmDone{} = 20 +traceNum TraceDemoteHotPeers{} = 21 +traceNum TraceDemoteLocalHotPeers{} = 22 +traceNum TraceDemoteHotFailed{} = 23 +traceNum TraceDemoteHotDone{} = 24 +traceNum TraceDemoteAsynchronous{} = 25 +traceNum TraceGovernorWakeup{} = 26 +traceNum TraceChurnWait{} = 27 +traceNum TraceChurnMode{} = 28 +traceNum TracePromoteWarmAborted{} = 29 +traceNum TraceDemoteLocalAsynchronous{} = 30 allTraceNames :: Map Int String allTraceNames = @@ -604,28 +605,29 @@ allTraceNames = , (05, "TracePeerShareRequests") , (06, "TracePeerShareResults") , (07, "TracePeerShareResultsFiltered") - , (08, "TraceForgetColdPeers") - , (09, "TracePromoteColdPeers") - , (10, "TracePromoteColdLocalPeers") - , (11, "TracePromoteColdFailed") - , (12, "TracePromoteColdDone") - , (13, "TracePromoteWarmPeers") - , (14, "TracePromoteWarmLocalPeers") - , (15, "TracePromoteWarmFailed") - , (16, "TracePromoteWarmDone") - , (17, "TraceDemoteWarmPeers") - , (18, "TraceDemoteWarmFailed") - , (19, "TraceDemoteWarmDone") - , (20, "TraceDemoteHotPeers") - , (21, "TraceDemoteLocalHotPeers") - , (22, "TraceDemoteHotFailed") - , (23, "TraceDemoteHotDone") - , (24, "TraceDemoteAsynchronous") - , (25, "TraceGovernorWakeup") - , (26, "TraceChurnWait") - , (27, "TraceChurnMode") - , (28, "TracePromoteWarmAborted") - , (29, "TraceDemoteAsynchronous") + , (08, "TraceKnownInboundConnection") + , (09, "TraceForgetColdPeers") + , (10, "TracePromoteColdPeers") + , (11, "TracePromoteColdLocalPeers") + , (12, "TracePromoteColdFailed") + , (13, "TracePromoteColdDone") + , (14, "TracePromoteWarmPeers") + , (15, "TracePromoteWarmLocalPeers") + , (16, "TracePromoteWarmFailed") + , (17, "TracePromoteWarmDone") + , (18, "TraceDemoteWarmPeers") + , (19, "TraceDemoteWarmFailed") + , (20, "TraceDemoteWarmDone") + , (21, "TraceDemoteHotPeers") + , (22, "TraceDemoteLocalHotPeers") + , (23, "TraceDemoteHotFailed") + , (24, "TraceDemoteHotDone") + , (25, "TraceDemoteAsynchronous") + , (26, "TraceGovernorWakeup") + , (27, "TraceChurnWait") + , (28, "TraceChurnMode") + , (29, "TracePromoteWarmAborted") + , (30, "TraceDemoteAsynchronous") ] @@ -2282,6 +2284,7 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains peerSharing = requestPeerShare = \_ _ -> return (PeerSharingResult []), peerConnToPeerSharing = \ps -> ps, requestPublicRootPeers = \_ -> return (Map.empty, 0), + readNewInboundConnection = retry, peerStateActions = PeerStateActions { establishPeerConnection = error "establishPeerConnection", monitorPeerConnection = error "monitorPeerConnection", diff --git a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs index dc89d4d1cba..dd6c2573c4a 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs @@ -57,11 +57,11 @@ import Ouroboros.Network.PeerSelection.Governor hiding (PeerSelectionState (..)) import qualified Ouroboros.Network.PeerSelection.LocalRootPeers as LocalRootPeers -import Ouroboros.Network.Testing.Data.Script (PickScript, - ScriptDelay (..), TimedScript, arbitraryPickScript, - initScript', interpretPickScript, playTimedScript, - prop_shrink_Script, singletonScript, stepScript, - stepScriptSTM) +import Ouroboros.Network.Testing.Data.Script ( + PickScript, Script (..), ScriptDelay (..), TimedScript, + arbitraryPickScript, arbitraryScriptOf, initScript', + interpretPickScript, playTimedScript, prop_shrink_Script, + singletonScript, stepScript) import Ouroboros.Network.Testing.Utils (arbitrarySubset, prop_shrink_nonequal, prop_shrink_valid) @@ -310,6 +310,7 @@ mockPeerSelectionActions' tracer peerConnToPeerSharing = \(PeerConn _ ps _) -> ps, requestPublicRootPeers, readPeerSelectionTargets = readTVar targetsVar, + readNewInboundConnection = retry, requestPeerShare, peerStateActions = PeerStateActions { establishPeerConnection, diff --git a/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs b/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs index 4ec60cfde08..fd2c7d48124 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs @@ -538,6 +538,8 @@ prop_peer_selection_trace_coverage defaultBearerInfo diffScript = "TracePeerShareResults" peerSelectionTraceMap (TracePeerShareResultsFiltered _) = "TracePeerShareResultsFiltered" + peerSelectionTraceMap (TraceKnownInboundConnection addr ps) = + "TraceKnownInboundConnection " ++ show addr ++ " " ++ show ps peerSelectionTraceMap (TraceForgetColdPeers _ _ _) = "TraceForgetColdPeers" peerSelectionTraceMap (TracePromoteColdPeers _ _ _) = From ad98b2b1a46d7ef5bc201a1300463b57363d0629 Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Thu, 26 Jan 2023 10:19:44 +0000 Subject: [PATCH 3/5] Added test to check no self connects --- .../test/Test/Ouroboros/Network/Testnet.hs | 57 +++++++++++++++++++ .../Network/Testnet/Simulation/Node.hs | 14 ++--- 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs b/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs index fd2c7d48124..9814c204dac 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/Testnet.hs @@ -142,6 +142,8 @@ tests = , testProperty "cm & ig timeouts enforced" prop_diffusion_timeouts_enforced , testProperty "unit #4177" unit_4177 + , testProperty "never connect to self" + prop_never_connects_to_self #endif #if !defined(mingw32_HOST_OS) , testGroup "coverage" @@ -2692,6 +2694,61 @@ prop_diffusion_timeouts_enforced defaultBearerInfo diffScript = in getAllProperty $ verifyAllTimeouts True transitionSignal +-- | This property checks that a node never connects to itself. +-- +-- Connecting to itself means connecting to exactly the same address and port +-- of a node's listening socket. This is something that in the real world +-- wouldn't happen since the kernel would disallow it. +-- +-- This check is important because our network simulation mock can not disallow +-- such cases, so we try very hard that our diffusion generator will not make +-- create a cenario where a node is connecting to itself. +-- +prop_never_connects_to_self :: AbsBearerInfo + -> DiffusionScript + -> Property +prop_never_connects_to_self absBearerInfo diffScript = + let sim :: forall s . IOSim s Void + sim = diffusionSimulation (toBearerInfo absBearerInfo) + diffScript + iosimTracer + tracerDiffusionSimWithTimeName + + events :: [Trace () DiffusionTestTrace] + events = fmap ( Trace.fromList () + . fmap (\(WithName _ (WithTime _ b)) -> b)) + . Trace.toList + . splitWithNameTrace + . Trace.fromList () + . fmap snd + . Trace.toList + . fmap (\(WithTime t (WithName name b)) + -> (t, WithName name (WithTime t b))) + . withTimeNameTraceEvents + @DiffusionTestTrace + @NtNAddr + . Trace.fromList (MainReturn (Time 0) () []) + . fmap (\(t, tid, tl, te) -> SimEvent t tid tl te) + . take 125000 + . traceEvents + $ runSimTrace sim + + in conjoin (never_connects_to_self <$> events) + + where + never_connects_to_self :: Trace () DiffusionTestTrace -> Property + never_connects_to_self events = + let connectionManagerEvents = Trace.toList + . selectDiffusionConnectionManagerEvents + $ events + + in counterexample (intercalate "\n" . map show $ connectionManagerEvents) + $ all (\ cmt -> case cmt of + TrConnect mbLocalAddr remoteAddr -> mbLocalAddr /= Just remoteAddr + _ -> True + ) + connectionManagerEvents + -- Utils -- diff --git a/ouroboros-network/test/Test/Ouroboros/Network/Testnet/Simulation/Node.hs b/ouroboros-network/test/Test/Ouroboros/Network/Testnet/Simulation/Node.hs index 9af44ebd18b..3f33b76fec5 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/Testnet/Simulation/Node.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/Testnet/Simulation/Node.hs @@ -238,7 +238,7 @@ genDomainMap raps selfIP = do m <- mapM (\d -> do size <- chooseInt (1, 5) ips' <- nub <$> vectorOf size (genIP ips) - return (d, delete selfIP ips')) domains + return (d, filter (/= selfIP) ips')) domains return (Map.fromList m) @@ -322,12 +322,8 @@ genNodeArgs :: [RelayAccessPoint] -> (NtNAddr, RelayAccessPoint) -> Gen NodeArgs genNodeArgs raps minConnected genLocalRootPeers (ntnAddr, rap) = do - -- Slot length needs to be greater than 0 else we get a livelock on - -- the IOSim. - -- - -- Quota values matches mainnet, so a slot length of 1s and 1 / 20 - -- chance that someone gets to make a block - let rapsWithoutSelf = delete rap raps + + let rapsWithoutSelf = filter (/= rap) raps (RelayAccessAddress rapIP _) = rap seed <- arbitrary @@ -437,7 +433,7 @@ genNonHotDiffusionScript = do genLocalRootPeers l r = do nrGroups <- chooseInt (1, 3) -- Remove self from local root peers - let newL = l \\ [r] + let newL = filter (/= r) l size = length newL sizePerGroup = (size `div` nrGroups) + 1 @@ -503,7 +499,7 @@ genHotDiffusionScript = do -> Gen [(Int, Map RelayAccessPoint PeerAdvertise)] genLocalRootPeers l r = do -- Remove self from local root peers - let newL = delete r l + let newL = filter (/= r) l size = length newL peerAdvertise <- vectorOf size arbitrary From fea3a5ada669bc6565695edd3449a21041ee36d9 Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Fri, 3 Mar 2023 10:55:47 +0000 Subject: [PATCH 4/5] Small fix --- .../ConnectionManager/InformationChannel.hs | 24 ++++++++++++++++ .../src/Ouroboros/Network/InboundGovernor.hs | 28 +------------------ .../Network/Protocol/Handshake/Unversioned.hs | 2 +- .../src/Ouroboros/Network/Server2.hs | 1 + .../src/Ouroboros/Network/Diffusion/P2P.hs | 6 ++-- 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs index f54d2daef76..106a0a28952 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs @@ -1,10 +1,16 @@ {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE DataKinds #-} module Ouroboros.Network.ConnectionManager.InformationChannel where import Control.Concurrent.Class.MonadSTM.Strict import Data.Functor (($>)) import GHC.Natural (Natural) +import Ouroboros.Network.Mux (MuxMode) +import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo) +import Ouroboros.Network.ConnectionHandler (Handle) +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) -- | Information channel. -- @@ -19,6 +25,24 @@ data InformationChannel a m = writeMessage :: a -> STM m () } +-- | A Server control channel which instantiates to 'NewConnection' and 'Handle'. +-- +-- It allows to pass 'STM' transactions which will resolve to 'NewConnection'. +-- Server's monitoring thread is the consumer of these messages; there are two +-- producers: accept loop and connection handler for outbound connections. +-- +type InboundGovernorInfoChannel (muxMode :: MuxMode) peerAddr versionData bytes m a b = + InformationChannel (NewConnectionInfo peerAddr (Handle muxMode peerAddr versionData bytes m a b)) m + +-- | Control Channel between Server and Outbound Governor. +-- +-- Control channel that is meant to share inbound connections with the Peer +-- Selection Governor. So the consumer is the Governor and Producer is the +-- Server. +-- +type OutboundGovernorInfoChannel peerAddr m = + InformationChannel (peerAddr, PeerSharing) m + newInformationChannel :: forall a m. MonadLabelledSTM m diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs index 58594230107..2b6175dded7 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs @@ -21,9 +21,6 @@ module Ouroboros.Network.InboundGovernor , newObservableStateVarFromSeed -- * Run Inbound Protocol Governor , inboundGovernor - -- * Auxiliary Types - , InboundGovernorInfoChannel - , OutboundGovernorInfoChannel -- * Trace , InboundGovernorTrace (..) , RemoteSt (..) @@ -59,14 +56,13 @@ import Ouroboros.Network.Channel (fromChannel) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionId (ConnectionId (..)) import Ouroboros.Network.ConnectionManager.InformationChannel - (InformationChannel) + (InboundGovernorInfoChannel) import qualified Ouroboros.Network.ConnectionManager.InformationChannel as InfoChannel import Ouroboros.Network.ConnectionManager.Types hiding (TrUnexpectedlyFalseAssertion) import Ouroboros.Network.InboundGovernor.Event import Ouroboros.Network.InboundGovernor.State import Ouroboros.Network.Mux -import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) import Ouroboros.Network.Server.RateLimiting -- | Run the server, which consists of the following components: @@ -504,28 +500,6 @@ runResponder mux startStrategy (runMuxPeer responder . fromChannel) --- --- Auxiliary Types --- - --- | A Server control channel which instantiates to 'NewConnection' and 'Handle'. --- --- It allows to pass 'STM' transactions which will resolve to 'NewConnection'. --- Server's monitoring thread is the consumer of these messages; there are two --- producers: accept loop and connection handler for outbound connections. --- -type InboundGovernorInfoChannel (muxMode :: MuxMode) peerAddr versionData bytes m a b = - InformationChannel (NewConnectionInfo peerAddr (Handle muxMode peerAddr versionData bytes m a b)) m - --- | Control Channel between Server and Outbound Governor. --- --- Control channel that is meant to share inbound connections with the Peer --- Selection Governor. So the consumer is the Governor and Producer is the --- Server. --- -type OutboundGovernorInfoChannel peerAddr m = - InformationChannel (peerAddr, PeerSharing) m - -- -- Trace -- diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs b/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs index ff8f399b5c5..ed6f0030bba 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Protocol/Handshake/Unversioned.hs @@ -76,7 +76,7 @@ unversionedProtocol = simpleSingletonVersions UnversionedProtocol UnversionedPro -- data DataFlowProtocolData = DataFlowProtocolData { - getProtocolDataFlow :: DataFlow, + getProtocolDataFlow :: DataFlow, getProtocolPeerSharing :: PeerSharing } deriving (Eq, Show) diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs index ee24de0cacb..3df3abf0b44 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs @@ -54,6 +54,7 @@ import Ouroboros.Network.InboundGovernor import Ouroboros.Network.Mux import Ouroboros.Network.Server.RateLimiting import Ouroboros.Network.Snocket +import Ouroboros.Network.ConnectionManager.InformationChannel (InboundGovernorInfoChannel) -- diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index 77ac7d57a6e..a906df0377b 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -81,14 +81,14 @@ import Data.List (nub) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionManager.Core import Ouroboros.Network.ConnectionManager.InformationChannel - (InformationChannel (..), newInformationChannel) + (InformationChannel (..), newInformationChannel, InboundGovernorInfoChannel, OutboundGovernorInfoChannel) import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.Diffusion.Common hiding (nullTracers) import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies import Ouroboros.Network.Diffusion.Utils import Ouroboros.Network.ExitPolicy -import Ouroboros.Network.InboundGovernor (InboundGovernorInfoChannel, - InboundGovernorTrace (..), OutboundGovernorInfoChannel, +import Ouroboros.Network.InboundGovernor ( + InboundGovernorTrace (..), RemoteTransitionTrace) import Ouroboros.Network.IOManager import Ouroboros.Network.Mux hiding (MiniProtocol (..)) From 1d2b488461ed150a211f6d21523a016c328cd4e8 Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Thu, 11 May 2023 15:05:25 +0100 Subject: [PATCH 5/5] Stylish --- .../Network/ConnectionManager/InformationChannel.hs | 12 ++++++------ .../src/Ouroboros/Network/Server2.hs | 3 ++- .../src/Ouroboros/Network/Diffusion/P2P.hs | 6 +++--- .../test/Test/Ouroboros/Network/PeerSelection.hs | 4 ++-- .../Network/PeerSelection/MockEnvironment.hs | 10 +++++----- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs index 106a0a28952..a7d9c4cb860 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/ConnectionManager/InformationChannel.hs @@ -1,16 +1,16 @@ -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE KindSignatures #-} -{-# LANGUAGE DataKinds #-} +{-# LANGUAGE RankNTypes #-} module Ouroboros.Network.ConnectionManager.InformationChannel where import Control.Concurrent.Class.MonadSTM.Strict import Data.Functor (($>)) import GHC.Natural (Natural) -import Ouroboros.Network.Mux (MuxMode) -import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo) -import Ouroboros.Network.ConnectionHandler (Handle) -import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) +import Ouroboros.Network.ConnectionHandler (Handle) +import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo) +import Ouroboros.Network.Mux (MuxMode) +import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing) -- | Information channel. -- diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs index 3df3abf0b44..550b9c43834 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Server2.hs @@ -49,12 +49,13 @@ import Foreign.C.Error #endif import Ouroboros.Network.ConnectionHandler +import Ouroboros.Network.ConnectionManager.InformationChannel + (InboundGovernorInfoChannel) import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.InboundGovernor import Ouroboros.Network.Mux import Ouroboros.Network.Server.RateLimiting import Ouroboros.Network.Snocket -import Ouroboros.Network.ConnectionManager.InformationChannel (InboundGovernorInfoChannel) -- diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index a906df0377b..ae05cdeb418 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -81,14 +81,14 @@ import Data.List (nub) import Ouroboros.Network.ConnectionHandler import Ouroboros.Network.ConnectionManager.Core import Ouroboros.Network.ConnectionManager.InformationChannel - (InformationChannel (..), newInformationChannel, InboundGovernorInfoChannel, OutboundGovernorInfoChannel) + (InboundGovernorInfoChannel, InformationChannel (..), + OutboundGovernorInfoChannel, newInformationChannel) import Ouroboros.Network.ConnectionManager.Types import Ouroboros.Network.Diffusion.Common hiding (nullTracers) import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies import Ouroboros.Network.Diffusion.Utils import Ouroboros.Network.ExitPolicy -import Ouroboros.Network.InboundGovernor ( - InboundGovernorTrace (..), +import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..), RemoteTransitionTrace) import Ouroboros.Network.IOManager import Ouroboros.Network.Mux hiding (MiniProtocol (..)) diff --git a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs index bd822147f5e..dd98bea3ae9 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection.hs @@ -68,6 +68,8 @@ import Test.Ouroboros.Network.PeerSelection.MockEnvironment hiding import Test.Ouroboros.Network.PeerSelection.PeerGraph import Control.Concurrent.Class.MonadSTM.Strict (newTVarIO) +import Control.Monad.Class.MonadTime.SI +import Control.Monad.IOSim import Ouroboros.Network.PeerSelection.LedgerPeers (IsLedgerPeer (..)) import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..)) @@ -78,8 +80,6 @@ import Test.QuickCheck import Test.Tasty (DependencyType (..), TestTree, after, testGroup) import Test.Tasty.QuickCheck (testProperty) import Text.Pretty.Simple (pPrint) -import Control.Monad.Class.MonadTime.SI -import Control.Monad.IOSim -- Exactly as named. unfHydra :: Int diff --git a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs index dd6c2573c4a..168aa10ca4f 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs @@ -57,11 +57,11 @@ import Ouroboros.Network.PeerSelection.Governor hiding (PeerSelectionState (..)) import qualified Ouroboros.Network.PeerSelection.LocalRootPeers as LocalRootPeers -import Ouroboros.Network.Testing.Data.Script ( - PickScript, Script (..), ScriptDelay (..), TimedScript, - arbitraryPickScript, arbitraryScriptOf, initScript', - interpretPickScript, playTimedScript, prop_shrink_Script, - singletonScript, stepScript) +import Ouroboros.Network.Testing.Data.Script (PickScript, Script (..), + ScriptDelay (..), TimedScript, arbitraryPickScript, + arbitraryScriptOf, initScript', interpretPickScript, + playTimedScript, prop_shrink_Script, singletonScript, + stepScript) import Ouroboros.Network.Testing.Utils (arbitrarySubset, prop_shrink_nonequal, prop_shrink_valid)