Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerSharingAPI fix, master branch #4841

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## next version

### Breaking changes

* `newPeerSharingAPI` requires `PublicPeerSelectionState` variable to be passed to it.
* `Diffusion.Arguments` requires `PublicPeerSelectionState`; the integration
code should make sure both `newPeerSharingAPI` and diffusion receives the
same mutable variable.
* `TracePeerShareRequest` also includes the number of requests peers.

## 0.13.1.0 -- 2023-03-20

### Breaking changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ import Ouroboros.Network.Diffusion qualified as Diff
import Ouroboros.Network.Diffusion.P2P qualified as Diff.P2P
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
import Ouroboros.Network.NodeToNode.Version (DiffusionMode (..))
import Ouroboros.Network.PeerSelection.Governor (PeerSelectionTargets (..))
import Ouroboros.Network.PeerSelection.Governor (PeerSelectionTargets (..),
PublicPeerSelectionState (..))
import Ouroboros.Network.PeerSelection.PeerMetric
(PeerMetricsConfiguration (..), newPeerMetric)
import Ouroboros.Network.Protocol.Handshake (HandshakeArguments (..))
Expand Down Expand Up @@ -101,7 +102,6 @@ import Ouroboros.Network.PeerSelection.RelayAccessPoint (DomainAccessPoint,
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSLookupType)
import Ouroboros.Network.PeerSelection.State.LocalRootPeers (HotValency,
WarmValency)
import Ouroboros.Network.PeerSharing (PeerSharingRegistry (PeerSharingRegistry))
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (addBlock,
getBlockPointSet)
import Test.Ouroboros.Network.Diffusion.Node.MiniProtocols qualified as Node
Expand Down Expand Up @@ -198,8 +198,6 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
useBootstrapPeersScriptVar <- newTVarIO (aReadUseBootstrapPeers na)
peerMetrics <- newPeerMetric PeerMetricsConfiguration { maxEntriesToTrack = 180 }

peerSharingRegistry <- PeerSharingRegistry <$> newTVarIO mempty

let -- diffusion interfaces
interfaces :: Diff.P2P.Interfaces (NtNFD m) NtNAddr NtNVersion NtNVersionData
(NtCFD m) NtCAddr NtCVersion NtCVersionData
Expand Down Expand Up @@ -263,7 +261,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
-- fetch mode is not used (no block-fetch mini-protocol)
, Diff.P2P.daBlockFetchMode = pure FetchModeDeadline
, Diff.P2P.daReturnPolicy = \_ -> config_REPROMOTE_DELAY
, Diff.P2P.daPeerSharingRegistry = peerSharingRegistry
, Diff.P2P.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
}

let apps = Node.applications (aDebugTracer na) nodeKernel Node.cborCodecs limits appArgs blockHeader
Expand All @@ -272,7 +270,8 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
(Diff.P2P.runM interfaces
Diff.nullTracers
tracersExtra
args (argsExtra useBootstrapPeersScriptVar) apps appsExtra)
(mkArgs (nkPublicPeerSelectionVar nodeKernel))
(mkArgsExtra useBootstrapPeersScriptVar) apps appsExtra)
$ \ diffusionThread ->
withAsync (blockFetch nodeKernel) $ \blockFetchLogicThread ->
wait diffusionThread
Expand Down Expand Up @@ -373,18 +372,21 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
decodeData _ (CBOR.TList [CBOR.TBool True, CBOR.TInt a]) = NtNVersionData InitiatorAndResponderDiffusionMode <$> (toPeerSharing a)
decodeData _ _ = Left (Text.pack "unversionedDataCodec: unexpected term")

args :: Diff.Arguments (NtNFD m) NtNAddr (NtCFD m) NtCAddr
args = Diff.Arguments
mkArgs :: StrictTVar m (PublicPeerSelectionState NtNAddr)
-> Diff.Arguments m (NtNFD m) NtNAddr (NtCFD m) NtCAddr
mkArgs daPublicPeerSelectionVar = Diff.Arguments
{ Diff.daIPv4Address = Right <$> (ntnToIPv4 . aIPAddress) na
, Diff.daIPv6Address = Right <$> (ntnToIPv6 . aIPAddress) na
, Diff.daLocalAddress = Nothing
, Diff.daAcceptedConnectionsLimit
= aAcceptedLimits na
, Diff.daMode = aDiffusionMode na
, Diff.daPublicPeerSelectionVar
}

argsExtra :: StrictTVar m (Script UseBootstrapPeers) -> Diff.P2P.ArgumentsExtra m
argsExtra ubpVar = Diff.P2P.ArgumentsExtra
mkArgsExtra :: StrictTVar m (Script UseBootstrapPeers)
-> Diff.P2P.ArgumentsExtra m
mkArgsExtra ubpVar = Diff.P2P.ArgumentsExtra
{ Diff.P2P.daPeerSelectionTargets = aPeerSelectionTargets na
, Diff.P2P.daReadLocalRootPeers = aReadLocalRootPeers na
, Diff.P2P.daReadPublicRootPeers = aReadPublicRootPeers na
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ applications debugTracer nodeKernel
peerSharingInitiator =
MiniProtocolCb $
\ ExpandedInitiatorContext {
eicConnectionId = ConnectionId { remoteAddress = them },
eicConnectionId = connId@ConnectionId { remoteAddress = them },
eicControlMessage = controlMessageSTM
}
channel
Expand All @@ -573,7 +573,7 @@ applications debugTracer nodeKernel
$ \controller -> do
psClient <- peerSharingClient controlMessageSTM controller
runPeerWithLimits
nullTracer
((show . (connId,)) `contramap` debugTracer)
peerSharingCodec
(peerSharingSizeLimits limits)
(peerSharingTimeLimits limits)
Expand All @@ -583,10 +583,10 @@ applications debugTracer nodeKernel
peerSharingResponder
:: PeerSharingAPI NtNAddr s m
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
peerSharingResponder psAPI = MiniProtocolCb $ \_ctx channel -> do
peerSharingResponder psAPI = MiniProtocolCb $ \ResponderContext { rcConnectionId = connId } channel -> do
labelThisThread "PeerSharingServer"
runPeerWithLimits
nullTracer
((show . (connId,)) `contramap` debugTracer)
peerSharingCodec
(peerSharingSizeLimits limits)
(peerSharingTimeLimits limits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Ouroboros.Network.Mock.Chain (Chain (..))
import Ouroboros.Network.NodeToNode ()
import Ouroboros.Network.PeerSelection.Governor (PublicPeerSelectionState,
makePublicPeerSelectionStateVar)
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint (..))
import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry (..),
Expand Down Expand Up @@ -264,21 +266,27 @@ data NodeKernel header block s m = NodeKernel {

nkChainDB :: ChainDB block m,

nkPeerSharingAPI :: PeerSharingAPI NtNAddr s m
nkPeerSharingAPI :: PeerSharingAPI NtNAddr s m,

nkPublicPeerSelectionVar :: StrictTVar m (PublicPeerSelectionState NtNAddr)
}

newNodeKernel :: ( MonadSTM m
, RandomGen s
)
=> s -> m (NodeKernel header block s m)
newNodeKernel rng = NodeKernel
<$> newTVarIO Map.empty
<*> newTVarIO (ChainProducerState Chain.Genesis Map.empty 0)
<*> newFetchClientRegistry
<*> newPeerSharingRegistry
<*> ChainDB.newChainDB
<*> newPeerSharingAPI rng ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS
newNodeKernel rng = do
publicStateVar <- makePublicPeerSelectionStateVar
NodeKernel
<$> newTVarIO Map.empty
<*> newTVarIO (ChainProducerState Chain.Genesis Map.empty 0)
<*> newFetchClientRegistry
<*> newPeerSharingRegistry
<*> ChainDB.newChainDB
<*> newPeerSharingAPI publicStateVar rng
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS
<*> pure publicStateVar

-- | Register a new upstream chain-sync client.
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ recentPeerShareActivity d =
-- schedule it to be removed again at time d+t. We arrange for the change in
-- the recent set to happen after the peer sharing event.
go !recentSet !recentPSQ
(E (TS t i) (GovernorEvent (TracePeerShareRequests _ _ _ addrs)) : txs) =
(E (TS t i) (GovernorEvent (TracePeerShareRequests _ _ _ _ addrs)) : txs) =
let recentSet' = recentSet <> addrs
recentPSQ' = foldl' (\q a -> PSQ.insert a t' () q) recentPSQ addrs
t' = d `addTime` t
Expand Down Expand Up @@ -3331,7 +3331,7 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
DNS.defaultResolvConf
readDomains
(ioDNSActions LookupReqAAndAAAA) $ \requestPublicRootPeers -> do
publicStateVar <- newTVarIO (emptyPublicPeerSelectionState @SockAddr)
publicStateVar <- makePublicPeerSelectionStateVar
debugVar <- newTVarIO $ emptyPeerSelectionState (mkStdGen 42) []
peerSelectionGovernor
tracer tracer tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ runGovernorInMockEnvironment mockEnv =

governorAction :: GovernorMockEnvironment -> IOSim s Void
governorAction mockEnv = do
publicStateVar <- StrictTVar.newTVarIO emptyPublicPeerSelectionState
publicStateVar <- makePublicPeerSelectionStateVar
lsjVar <- playTimedScript (contramap TraceEnvSetLedgerStateJudgement tracerMockEnv)
(ledgerStateJudgement mockEnv)
usbVar <- playTimedScript (contramap TraceEnvSetUseBootstrapPeers tracerMockEnv)
Expand Down Expand Up @@ -610,7 +610,7 @@ tracerTracePeerSelection = contramap f tracerTestTraceEvent
f a@(TraceBigLedgerPeersResults !_ !_ !_) = GovernorEvent a
f a@(TraceBigLedgerPeersFailure !_ !_ !_) = GovernorEvent a
f a@(TraceForgetBigLedgerPeers !_ !_ !_) = GovernorEvent a
f a@(TracePeerShareRequests !_ !_ !_ !_) = GovernorEvent a
f a@(TracePeerShareRequests !_ !_ !_ !_ !_) = GovernorEvent a
f a@(TracePeerShareResults !_) = GovernorEvent a
f a@(TracePeerShareResultsFiltered !_) = GovernorEvent a
f a@(TraceKnownInboundConnection !_ !_) = GovernorEvent a
Expand Down
Loading
Loading