Skip to content

Commit

Permalink
Address #4782
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Mar 6, 2024
1 parent 68742e7 commit 3b56b4e
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 116 deletions.
9 changes: 8 additions & 1 deletion ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Non-Breaking changes

* Refactored `computePeerSharingPeers` and moved it to
`Ouroboros.Network.Peersharing`
* Added `PeerSharingAPI` with all the things necessary to run peer sharing.

## 0.12.0.0 -- 2023-02-21

### Breaking changes
Expand All @@ -23,6 +27,9 @@
peer selection upon getting a USR1 sig.
* Changed withPeerSelectionActions and withLedgerPeers signatures

* Removed `computePeers` callback in `daApplicationInitiatorAndResponderMode`.
* Changed `peerSharingServer` to require `PeerSharingAPI`.

### Non-breaking changes

* Limit the rate at which one can discover peers through peersharing.
Expand Down Expand Up @@ -56,7 +63,7 @@

* `PeerSharingController` is now private and `requestPeers` is exported

* Fix hot demototion by having blockfetch give chainsync a chance to exit
* Fix hot demotion by having blockfetch give chainsync a chance to exit
cleanly before killing it.

* Disable mean reward for new peers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
<> wait blockFetchLogicThread
<> wait nodeKernelThread
where
blockFetch :: NodeKernel BlockHeader Block m
blockFetch :: NodeKernel BlockHeader Block s m
-> m Void
blockFetch nodeKernel = do
blockFetchLogic
Expand All @@ -295,7 +295,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
bfcSalt = 0
})

blockFetchPolicy :: NodeKernel BlockHeader Block m
blockFetchPolicy :: NodeKernel BlockHeader Block s m
-> BlockFetchConsensusInterface NtNAddr BlockHeader Block m
blockFetchPolicy nodeKernel =
BlockFetchConsensusInterface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import Data.ByteString.Lazy (ByteString)
import Data.Functor (($>))
import Data.Maybe (fromMaybe)
import Data.Void (Void)
import System.Random (StdGen)
import System.Random (RandomGen, StdGen)

import Codec.CBOR.Read qualified as CBOR
import Codec.Serialise qualified as Serialise
Expand Down Expand Up @@ -89,13 +89,12 @@ import Ouroboros.Network.NodeToNode (blockFetchMiniProtocolNum,
peerSharingMiniProtocolNum)
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.PeerSharing qualified as PSTypes
import Ouroboros.Network.PeerSharing (bracketPeerSharingClient,
import Ouroboros.Network.PeerSharing (PeerSharingAPI, bracketPeerSharingClient,
peerSharingClient, peerSharingServer)
import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (codecPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing,
PeerSharingAmount (..))
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel


Expand Down Expand Up @@ -211,7 +210,7 @@ data AppArgs header block m = AppArgs

-- | Protocol handlers.
--
applications :: forall block header m.
applications :: forall block header s m.
( Alternative (STM m)
, MonadAsync m
, MonadFork m
Expand All @@ -228,9 +227,10 @@ applications :: forall block header m.
, Show block
, ShowProxy block
, ShowProxy header
, RandomGen s
)
=> Tracer m String
-> NodeKernel header block m
-> NodeKernel header block s m
-> Codecs NtNAddr header block m
-> LimitsAndTimeouts header block
-> AppArgs header block m
Expand Down Expand Up @@ -260,10 +260,10 @@ applications debugTracer nodeKernel
simpleSingletonVersions UnversionedProtocol
(NtNVersionData InitiatorOnlyDiffusionMode aaOwnPeerSharing)
initiatorApp
, Diff.daApplicationInitiatorResponderMode = \computePeers ->
, Diff.daApplicationInitiatorResponderMode =
simpleSingletonVersions UnversionedProtocol
(NtNVersionData aaDiffusionMode aaOwnPeerSharing)
(initiatorAndResponderApp computePeers)
initiatorAndResponderApp
, Diff.daLocalResponderApplication =
simpleSingletonVersions UnversionedProtocol
UnversionedProtocolData
Expand All @@ -275,7 +275,7 @@ applications debugTracer nodeKernel
initiatorApp
:: OuroborosBundleWithExpandedCtx InitiatorMode NtNAddr ByteString m () Void
-- initiator mode will never run a peer sharing responder side
initiatorApp = fmap f <$> initiatorAndResponderApp (error "impossible happened!")
initiatorApp = fmap f <$> initiatorAndResponderApp
where
f :: MiniProtocolWithExpandedCtx InitiatorResponderMode NtNAddr ByteString m () ()
-> MiniProtocolWithExpandedCtx InitiatorMode NtNAddr ByteString m () Void
Expand All @@ -291,10 +291,8 @@ applications debugTracer nodeKernel
}

initiatorAndResponderApp
:: (PeerSharingAmount -> m [NtNAddr])
-- ^ Peer Sharing result computation callback
-> OuroborosBundleWithExpandedCtx InitiatorResponderMode NtNAddr ByteString m () ()
initiatorAndResponderApp computePeers = TemperatureBundle
:: OuroborosBundleWithExpandedCtx InitiatorResponderMode NtNAddr ByteString m () ()
initiatorAndResponderApp = TemperatureBundle
{ withHot = WithHot
[ MiniProtocol
{ miniProtocolNum = chainSyncMiniProtocolNum
Expand Down Expand Up @@ -339,7 +337,7 @@ applications debugTracer nodeKernel
, miniProtocolRun =
InitiatorAndResponderProtocol
peerSharingInitiator
(peerSharingResponder computePeers)
(peerSharingResponder (nkPeerSharingAPI nodeKernel))
}
]
else []
Expand Down Expand Up @@ -583,9 +581,9 @@ applications debugTracer nodeKernel
(peerSharingClientPeer psClient)

peerSharingResponder
:: (PeerSharingAmount -> m [NtNAddr])
:: PeerSharingAPI NtNAddr s m
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
peerSharingResponder f = MiniProtocolCb $ \_ctx channel -> do
peerSharingResponder psAPI = MiniProtocolCb $ \_ctx channel -> do
labelThisThread "PeerSharingServer"
runPeerWithLimits
nullTracer
Expand All @@ -594,7 +592,7 @@ applications debugTracer nodeKernel
(peerSharingTimeLimits limits)
channel
$ peerSharingServerPeer
$ peerSharingServer f
$ peerSharingServer psAPI


--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import Data.Typeable (Typeable)
import Data.Void (Void)
import Numeric.Natural (Natural)

import System.Random (StdGen, randomR)
import System.Random (RandomGen, StdGen, randomR, split)

import Data.Monoid.Synchronisation

Expand Down Expand Up @@ -80,8 +80,9 @@ import Ouroboros.Network.Mock.Chain (Chain (..))
import Ouroboros.Network.NodeToNode ()
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint (..))
import Ouroboros.Network.PeerSharing (PeerSharingRegistry (..),
newPeerSharingRegistry)
import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry (..),
newPeerSharingAPI, newPeerSharingRegistry,
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (ChainDB (..))
import Test.Ouroboros.Network.Diffusion.Node.ChainDB qualified as ChainDB
import Test.QuickCheck (Arbitrary (..), choose, chooseInt, frequency, oneof)
Expand Down Expand Up @@ -248,7 +249,7 @@ randomBlockGenerationArgs bgaSlotDuration bgaSeed quota =
, bgaSeed
}

data NodeKernel header block m = NodeKernel {
data NodeKernel header block s m = NodeKernel {
-- | upstream chains
nkClientChains
:: StrictTVar m (Map NtNAddr (StrictTVar m (Chain header))),
Expand All @@ -261,21 +262,28 @@ data NodeKernel header block m = NodeKernel {

nkPeerSharingRegistry :: PeerSharingRegistry NtNAddr m,

nkChainDB :: ChainDB block m
nkChainDB :: ChainDB block m,

nkPeerSharingAPI :: PeerSharingAPI NtNAddr s m
}

newNodeKernel :: MonadSTM m => m (NodeKernel header block m)
newNodeKernel = NodeKernel
newNodeKernel :: ( MonadSTM m
, RandomGen s
)
=> s -> m (NodeKernel header block s m)
newNodeKernel rng = NodeKernel
<$> newTVarIO Map.empty
<*> newTVarIO (ChainProducerState Chain.Genesis Map.empty 0)
<*> newFetchClientRegistry
<*> newPeerSharingRegistry
<*> ChainDB.newChainDB
<*> newPeerSharingAPI rng ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS

-- | Register a new upstream chain-sync client.
--
registerClientChains :: MonadSTM m
=> NodeKernel header block m
=> NodeKernel header block s m
-> NtNAddr
-> m (StrictTVar m (Chain header))
registerClientChains NodeKernel { nkClientChains } peerAddr = atomically $ do
Expand All @@ -287,7 +295,7 @@ registerClientChains NodeKernel { nkClientChains } peerAddr = atomically $ do
-- | Unregister an upstream chain-sync client.
--
unregisterClientChains :: MonadSTM m
=> NodeKernel header block m
=> NodeKernel header block s m
-> NtNAddr
-> m ()
unregisterClientChains NodeKernel { nkClientChains } peerAddr = atomically $
Expand Down Expand Up @@ -349,19 +357,21 @@ withNodeKernelThread
, MonadThrow m
, MonadThrow (STM m)
, HasFullHeader block
, RandomGen seed
)
=> BlockGeneratorArgs block seed
-> (NodeKernel header block m -> Async m Void -> m a)
-> (NodeKernel header block seed m -> Async m Void -> m a)
-- ^ The continuation which has a handle to the chain selection \/ block
-- production thread. The thread might throw an exception.
-> m a
withNodeKernelThread BlockGeneratorArgs { bgaSlotDuration, bgaBlockGenerator, bgaSeed }
k = do
kernel <- newNodeKernel
let (_, psSeed) = split bgaSeed
kernel <- newNodeKernel psSeed
withSlotTime bgaSlotDuration $ \waitForSlot ->
withAsync (blockProducerThread kernel waitForSlot) (k kernel)
where
blockProducerThread :: NodeKernel header block m
blockProducerThread :: NodeKernel header block seed m
-> (SlotNo -> STM m SlotNo)
-> m Void
blockProducerThread NodeKernel { nkChainProducerState, nkChainDB }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3377,8 +3377,6 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
policyPeerShareBatchWaitTime = 0, -- seconds
policyPeerShareOverallTimeout = 0, -- seconds
policyPeerShareActivationDelay = 2, -- seconds
policyPeerShareStickyTime = 1, --seconds
policyPeerShareMaxPeers = 10,
policyErrorDelay = 0 -- seconds
}
pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,9 @@ mockPeerSelectionPolicy GovernorMockEnvironment {
policyPeerShareBatchWaitTime = 3, -- seconds
policyPeerShareOverallTimeout = 10, -- seconds
policyPeerShareActivationDelay = 300, -- seconds
policyPeerShareStickyTime = 257, -- seconds
policyPeerShareMaxPeers = 10,
policyErrorDelay = 10 -- seconds
policyErrorDelay = 10 -- seconds
}


--
-- Utils for properties
--
Expand Down
4 changes: 1 addition & 3 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import Ouroboros.Network.NodeToNode (AcceptedConnectionsLimit, ConnectionId,
import Ouroboros.Network.NodeToNode qualified as NodeToNode
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount)
import Ouroboros.Network.Snocket (FileDescriptor)
import Ouroboros.Network.Socket (SystemdSocketTracer)

Expand Down Expand Up @@ -164,8 +163,7 @@ data Applications ntnAddr ntnVersion ntnVersionData
--
, daApplicationInitiatorResponderMode
-- Peer Sharing result computation callback
:: (PeerSharingAmount -> m [ntnAddr])
-> Versions ntnVersion
:: Versions ntnVersion
ntnVersionData
(OuroborosBundleWithExpandedCtx
InitiatorResponderMode ntnAddr
Expand Down
3 changes: 1 addition & 2 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ run Tracers
-- [].
(mkResponderApp
<$> daApplicationInitiatorResponderMode
applications
(\_ -> pure []))
applications)
remoteErrorPolicy
)
runIpSubscriptionWorker :: SocketSnocket
Expand Down
Loading

0 comments on commit 3b56b4e

Please sign in to comment.