Skip to content

Commit

Permalink
Integrate [ouroboros-network #4795](IntersectMBO/ouroboros-network#4795)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 authored and dnadales committed Mar 25, 2024
1 parent 2e74e37 commit c2f5ebf
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.Tracer
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BSL
import Data.Hashable (Hashable)
import Data.Int (Int64)
import Data.Map.Strict (Map)
import Data.Void (Void)
Expand Down Expand Up @@ -110,8 +111,7 @@ import Ouroboros.Network.Protocol.PeerSharing.Codec
codecPeerSharingId, timeLimitsPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server
(PeerSharingServer, peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing,
PeerSharingAmount)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.Protocol.TxSubmission2.Codec
import Ouroboros.Network.Protocol.TxSubmission2.Server
Expand Down Expand Up @@ -204,16 +204,14 @@ mkHandlers ::
, HasTxId (GenTx blk)
, LedgerSupportsProtocol blk
, Ord addrNTN
, Hashable addrNTN
)
=> NodeKernelArgs m addrNTN addrNTC blk
-> NodeKernel m addrNTN addrNTC blk
-> (PeerSharingAmount -> m [addrNTN])
-- ^ Peer Sharing result computation callback
-> Handlers m addrNTN blk
mkHandlers
NodeKernelArgs {chainSyncFutureCheck, keepAliveRng, miniProtocolParameters}
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers}
computePeers =
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers, getPeerSharingAPI} =
Handlers {
hChainSyncClient = \peer _isBigLedgerpeer dynEnv ->
CsClient.chainSyncClient
Expand Down Expand Up @@ -257,7 +255,7 @@ mkHandlers
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng
, hKeepAliveServer = \_version _peer -> keepAliveServer
, hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM
, hPeerSharingServer = \_version _peer -> peerSharingServer computePeers
, hPeerSharingServer = \_version _peer -> peerSharingServer getPeerSharingAPI
}

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,13 @@ import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.PeerSelection.PeerSharing.Codec
(decodeRemoteAddress, encodeRemoteAddress)
import Ouroboros.Network.Protocol.Limits (shortWait)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount)
import Ouroboros.Network.RethrowPolicy
import System.Exit (ExitCode (..))
import System.FilePath ((</>))
import System.FS.API (SomeHasFS (..))
import System.FS.API.Types
import System.FS.IO (ioHasFS)
import System.Random (StdGen, newStdGen, randomIO, randomRIO)
import System.Random (StdGen, newStdGen, randomIO, randomRIO, split)

{-------------------------------------------------------------------------------
The arguments to the Consensus Layer node functionality
Expand Down Expand Up @@ -462,8 +461,6 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
-> (NodeToNodeVersion -> addrNTN -> CBOR.Encoding)
-> (NodeToNodeVersion -> forall s . CBOR.Decoder s addrNTN)
-> BlockNodeToNodeVersion blk
-> (PeerSharingAmount -> m [addrNTN])
-- ^ Peer Sharing result computation callback
-> NTN.Apps m
addrNTN
ByteString
Expand All @@ -473,15 +470,15 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
ByteString
NodeToNodeInitiatorResult
()
mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version computePeers =
mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version =
NTN.mkApps
nodeKernel
rnTraceNTN
(NTN.defaultCodecs codecConfig version encAddrNTN decAddrNTN)
NTN.byteLimits
llrnChainSyncTimeout
(reportMetric Diffusion.peerMetricsConfiguration peerMetrics)
(NTN.mkHandlers nodeKernelArgs nodeKernel computePeers)
(NTN.mkHandlers nodeKernelArgs nodeKernel)

mkNodeToClientApps
:: NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk
Expand All @@ -500,8 +497,6 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
:: NetworkP2PMode p2p
-> MiniProtocolParameters
-> ( BlockNodeToNodeVersion blk
-- Peer Sharing result computation callback
-> (PeerSharingAmount -> m [addrNTN])
-> NTN.Apps
m
addrNTN
Expand Down Expand Up @@ -564,16 +559,16 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
-- Initiator side won't start responder side of Peer
-- Sharing protocol so we give a dummy implementation
-- here.
$ ntnApps blockVersion (error "impossible happened!"))
$ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
],
Diffusion.daApplicationInitiatorResponderMode = \computePeers ->
Diffusion.daApplicationInitiatorResponderMode =
combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTN
(NTN.initiatorAndResponder miniProtocolParams version rnPeerSharing
$ ntnApps blockVersion computePeers)
$ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
],
Diffusion.daLocalResponderApplication =
Expand Down Expand Up @@ -704,7 +699,7 @@ mkNodeKernelArgs
registry
bfcSalt
gsmAntiThunderingHerd
keepAliveRng
rng
cfg
tracers
btime
Expand All @@ -715,6 +710,7 @@ mkNodeKernelArgs
gsmMarkerFileView
getUseBootstrapPeers
= do
let (kaRng, psRng) = split rng
return NodeKernelArgs
{ tracers
, registry
Expand All @@ -727,14 +723,15 @@ mkNodeKernelArgs
, mempoolCapacityOverride = NoMempoolCapacityBytesOverride
, miniProtocolParameters = defaultMiniProtocolParameters
, blockFetchConfiguration = defaultBlockFetchConfiguration
, keepAliveRng
, gsmArgs = GsmNodeKernelArgs {
gsmAntiThunderingHerd
, gsmDurationUntilTooOld
, gsmMarkerFileView
, gsmMinCaughtUpDuration = maxCaughtUpAge
}
, getUseBootstrapPeers
, keepAliveRng = kaRng
, peerSharingRng = psRng
}
where
defaultBlockFetchConfiguration :: BlockFetchConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ import Ouroboros.Network.NodeToNode (ConnectionId,
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerStateJudgement (..))
import Ouroboros.Network.PeerSharing (PeerSharingRegistry,
newPeerSharingRegistry)
import Ouroboros.Network.PeerSharing (PeerSharingAPI,
PeerSharingRegistry, newPeerSharingAPI,
newPeerSharingRegistry, ps_POLICY_PEER_SHARE_MAX_PEERS,
ps_POLICY_PEER_SHARE_STICKY_TIME)
import Ouroboros.Network.TxSubmission.Inbound
(TxSubmissionMempoolWriter)
import qualified Ouroboros.Network.TxSubmission.Inbound as Inbound
Expand Down Expand Up @@ -144,7 +146,9 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
--
-- When set with the empty list '[]' block forging will be disabled.
--
, setBlockForging :: [BlockForging m blk] -> m ()
, setBlockForging :: [BlockForging m blk] -> m ()

, getPeerSharingAPI :: PeerSharingAPI addrNTN StdGen m
}

-- | Arguments required when initializing a node
Expand All @@ -163,6 +167,7 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs {
, keepAliveRng :: StdGen
, gsmArgs :: GsmNodeKernelArgs m blk
, getUseBootstrapPeers :: STM m UseBootstrapPeers
, peerSharingRng :: StdGen
}

initNodeKernel ::
Expand All @@ -180,6 +185,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, chainDB, initChainDB
, blockFetchConfiguration
, gsmArgs
, peerSharingRng
} = do
-- using a lazy 'TVar', 'BlockForging' does not have a 'NoThunks' instance.
blockForgingVar :: LazySTM.TMVar m [BlockForging m blk] <- LazySTM.newTMVarIO []
Expand Down Expand Up @@ -237,6 +243,10 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
TooOld -> GSM.enterOnlyBootstrap gsm
YoungEnough -> GSM.enterCaughtUp gsm

peerSharingAPI <- newPeerSharingAPI peerSharingRng
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS

void $ forkLinkedThread registry "NodeKernel.blockForging" $
blockForgingController st (LazySTM.takeTMVar blockForgingVar)

Expand All @@ -262,6 +272,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
, getPeerSharingAPI = peerSharingAPI
}
where
blockForgingController :: InternalState m remotePeer localPeer blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Protocol.TxSubmission2.Type
import qualified System.FS.Sim.MockFS as Mock
import System.FS.Sim.MockFS (MockFS)
import System.Random (mkStdGen)
import System.Random (mkStdGen, split)
import Test.ThreadNet.TxGen
import Test.ThreadNet.Util.NodeJoinPlan
import Test.ThreadNet.Util.NodeRestarts
Expand Down Expand Up @@ -969,8 +969,9 @@ runThreadNetwork systemTime ThreadNetworkArgs
, hfbtMaxClockRewind = secondsToNominalDiffTime 0
}

let kaRng = case seed of
let rng = case seed of
Seed s -> mkStdGen s
(kaRng, psRng) = split rng
let nodeKernelArgs = NodeKernelArgs
{ tracers
, registry
Expand All @@ -985,6 +986,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
, blockFetchSize = estimateBlockSize
, mempoolCapacityOverride = NoMempoolCapacityBytesOverride
, keepAliveRng = kaRng
, peerSharingRng = psRng
, miniProtocolParameters = MiniProtocolParameters {
chainSyncPipeliningHighMark = 4,
chainSyncPipeliningLowMark = 2,
Expand Down Expand Up @@ -1039,7 +1041,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
-- The purpose of this test is not testing protocols, so
-- returning constant empty list is fine if we have thorough
-- tests about the peer sharing protocol itself.
(NTN.mkHandlers nodeKernelArgs nodeKernel (\_ -> return []))
(NTN.mkHandlers nodeKernelArgs nodeKernel)

-- In practice, a robust wallet/user can persistently add a transaction
-- until it appears on the chain. This thread adds robustness for the
Expand Down

0 comments on commit c2f5ebf

Please sign in to comment.