Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Networking] Handling iHave overpromising part-1 #4556

Merged
merged 32 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6aadc4d
adds scoring parameters for mesh message delivery
yhassanzadeh13 Jul 5, 2023
b6a4630
refactors config-based approach to override-based approach
yhassanzadeh13 Jul 5, 2023
60d39d4
generates mocks
yhassanzadeh13 Jul 5, 2023
7742875
adds a documentation
yhassanzadeh13 Jul 5, 2023
c8b3a96
extends godoc
yhassanzadeh13 Jul 5, 2023
e3ce3cc
adds test skeleton
yhassanzadeh13 Jul 5, 2023
76d05a2
fixes test
yhassanzadeh13 Jul 5, 2023
ea4309b
implements under-performing test
yhassanzadeh13 Jul 6, 2023
1cbe332
adds test for under-delivery in two topics
yhassanzadeh13 Jul 6, 2023
a31489d
refactors test fixture
yhassanzadeh13 Jul 6, 2023
c763b97
wip
yhassanzadeh13 Jul 10, 2023
572f1d0
err fix
yhassanzadeh13 Jul 11, 2023
d589916
applies refactoring
yhassanzadeh13 Jul 11, 2023
7c21e79
fixes test
yhassanzadeh13 Jul 12, 2023
93e31f8
parallelize the fixture
yhassanzadeh13 Jul 12, 2023
9ed306d
adds logs for overriding parameters
yhassanzadeh13 Jul 12, 2023
2fef6e8
adds logging for network type to builder
yhassanzadeh13 Jul 12, 2023
889ade0
fmt
yhassanzadeh13 Jul 12, 2023
6da4f4c
renames a fixture function
yhassanzadeh13 Jul 12, 2023
6ae3d82
adds warn logging for overriding score parameters
yhassanzadeh13 Jul 12, 2023
f31d2da
revises godocs
yhassanzadeh13 Jul 12, 2023
2509a7a
adds readme
yhassanzadeh13 Jul 12, 2023
0788b8b
fixes scoring test
yhassanzadeh13 Jul 12, 2023
85eb327
fixing lint
yhassanzadeh13 Jul 12, 2023
6f7eb69
lint fix
yhassanzadeh13 Jul 12, 2023
c52a3eb
Merge remote-tracking branch 'origin/master' into yahya/6459-ihave-br…
yhassanzadeh13 Jul 12, 2023
0310507
improves test quality
yhassanzadeh13 Jul 12, 2023
8386009
Merge branch 'master' into yahya/6459-ihave-broken-promises
yhassanzadeh13 Jul 14, 2023
af39aa4
Merge branch 'master' into yahya/6459-ihave-broken-promises
yhassanzadeh13 Jul 17, 2023
273624a
Merge branch 'master' into yahya/6459-ihave-broken-promises
yhassanzadeh13 Jul 17, 2023
493a8f5
Merge branch 'master' into yahya/6459-ihave-broken-promises
yhassanzadeh13 Jul 18, 2023
0625970
Update network/p2p/builder.go
yhassanzadeh13 Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions insecure/corruptlibp2p/spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func TestSpam_IHave(t *testing.T) {
// this is vital as the spammer will circumvent the normal pubsub subscription mechanism and send iHAVE messages directly to the victim.
// without a prior connection established, directly spamming pubsub messages may cause a race condition in the pubsub implementation.
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
return unittest.ProposalFixture(), blockTopic
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})

// prepare to spam - generate iHAVE control messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func startNodesAndEnsureConnected(t *testing.T, ctx irrecoverable.SignalerContex
// this is vital as the spammer will circumvent the normal pubsub subscription mechanism and send iHAVE messages directly to the victim.
// without a prior connection established, directly spamming pubsub messages may cause a race condition in the pubsub implementation.
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
return unittest.ProposalFixture(), blockTopic
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {
t.Name(),
idProvider,
p2ptest.WithRole(flow.RoleConsensus),
p2ptest.WithPeerScoringEnabled(idProvider),
p2ptest.EnablePeerScoringWithOverride(p2p.PeerScoringConfigNoOverride),
)

ids := flow.IdentityList{&victimId, &spammer.SpammerId}
Expand Down Expand Up @@ -1196,9 +1196,9 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {
p2ptest.LetNodesDiscoverEachOther(t, ctx, nodes, ids)

// as nodes started fresh and no spamming has happened yet, the nodes should be able to exchange messages on the topic.
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
return unittest.ProposalFixture(), blockTopic
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})

// prepares spam graft and prune messages with different strategies.
Expand Down Expand Up @@ -1228,9 +1228,8 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {

// now we expect the detection and mitigation to kick in and the victim node to disconnect from the spammer node.
// so the spammer and victim nodes should not be able to exchange messages on the topic.
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{victimNode}, []p2p.LibP2PNode{spammer.SpammerNode}, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkID)
return unittest.ProposalFixture(), blockTopic
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{victimNode}, []p2p.LibP2PNode{spammer.SpammerNode}, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
}

Expand Down

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import (
// and private (i.e., staked) networks.
type NetworkingType uint8

func (t NetworkingType) String() string {
switch t {
case PrivateNetwork:
return "private"
case PublicNetwork:
return "public"
default:
return "unknown"
}
}

const (
// PrivateNetwork indicates that the staked private-side of the Flow blockchain that nodes can only join and leave
// with a staking requirement.
Expand Down
64 changes: 43 additions & 21 deletions network/p2p/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type GossipSubAdapterConfigFunc func(*BasePubSubAdapterConfig) PubSubAdapterConf

// GossipSubBuilder provides a builder pattern for creating a GossipSub pubsub system.
type GossipSubBuilder interface {
PeerScoringBuilder
// SetHost sets the host of the builder.
// If the host has already been set, a fatal error is logged.
SetHost(host.Host)
Expand All @@ -45,9 +44,16 @@ type GossipSubBuilder interface {
// We expect the node to initialize with a default gossipsub config. Hence, this function overrides the default config.
SetGossipSubConfigFunc(GossipSubAdapterConfigFunc)

// SetGossipSubPeerScoring sets the gossipsub peer scoring of the builder.
// If the gossipsub peer scoring flag has already been set, a fatal error is logged.
SetGossipSubPeerScoring(bool)
// EnableGossipSubScoringWithOverride enables peer scoring for the GossipSub pubsub system with the given override.
// Any existing peer scoring config attribute that is set in the override will override the default peer scoring config.
// Anything that is left to nil or zero value in the override will be ignored and the default value will be used.
// Note: it is not recommended to override the default peer scoring config in production unless you know what you are doing.
// Production Tip: use PeerScoringConfigNoOverride as the argument to this function to enable peer scoring without any override.
// Args:
// - PeerScoringConfigOverride: override for the peer scoring config- Recommended to use PeerScoringConfigNoOverride for production.
// Returns:
// none
EnableGossipSubScoringWithOverride(*PeerScoringConfigOverride)

// SetGossipSubScoreTracerInterval sets the gossipsub score tracer interval of the builder.
// If the gossipsub score tracer interval has already been set, a fatal error is logged.
Expand Down Expand Up @@ -81,16 +87,6 @@ type GossipSubBuilder interface {
Build(irrecoverable.SignalerContext) (PubSubAdapter, error)
}

type PeerScoringBuilder interface {
// SetTopicScoreParams sets the topic score parameters for the given topic.
// If the topic score parameters have already been set for the given topic, it is overwritten.
SetTopicScoreParams(topic channels.Topic, topicScoreParams *pubsub.TopicScoreParams)

// SetAppSpecificScoreParams sets the application specific score parameters for the given topic.
// If the application specific score parameters have already been set for the given topic, it is overwritten.
SetAppSpecificScoreParams(func(peer.ID) float64)
}

// GossipSubRpcInspectorSuiteFactoryFunc is a function that creates a new RPC inspector suite. It is used to create
// RPC inspectors for the gossipsub protocol. The RPC inspectors are used to inspect and validate
// incoming RPC messages before they are processed by the gossipsub protocol.
Expand Down Expand Up @@ -123,11 +119,16 @@ type NodeBuilder interface {
SetConnectionGater(ConnectionGater) NodeBuilder
SetRoutingSystem(func(context.Context, host.Host) (routing.Routing, error)) NodeBuilder

// EnableGossipSubPeerScoring enables peer scoring for the GossipSub pubsub system.
// Arguments:
// - module.IdentityProvider: the identity provider for the node (must be set before calling this method).
// - *PeerScoringConfig: the peer scoring configuration for the GossipSub pubsub system. If nil, the default configuration is used.
EnableGossipSubPeerScoring(*PeerScoringConfig) NodeBuilder
// EnableGossipSubScoringWithOverride enables peer scoring for the GossipSub pubsub system with the given override.
// Any existing peer scoring config attribute that is set in the override will override the default peer scoring config.
// Anything that is left to nil or zero value in the override will be ignored and the default value will be used.
// Note: it is not recommended to override the default peer scoring config in production unless you know what you are doing.
// Production Tip: use PeerScoringConfigNoOverride as the argument to this function to enable peer scoring without any override.
// Args:
// - PeerScoringConfigOverride: override for the peer scoring config- Recommended to use PeerScoringConfigNoOverride for production.
// Returns:
// none
EnableGossipSubScoringWithOverride(*PeerScoringConfigOverride) NodeBuilder
SetCreateNode(CreateNodeFunc) NodeBuilder
SetGossipSubFactory(GossipSubFactoryFunc, GossipSubAdapterConfigFunc) NodeBuilder
SetStreamCreationRetryInterval(time.Duration) NodeBuilder
Expand All @@ -138,10 +139,31 @@ type NodeBuilder interface {
Build() (LibP2PNode, error)
}

// PeerScoringConfig is a configuration for peer scoring parameters for a GossipSub pubsub system.
type PeerScoringConfig struct {
// PeerScoringConfigOverride is a structure that is used to carry over the override values for peer scoring configuration.
// Any attribute that is set in the override will override the default peer scoring config.
// Typically, we are not recommending to override the default peer scoring config in production unless you know what you are doing.
type PeerScoringConfigOverride struct {
// TopicScoreParams is a map of topic score parameters for each topic.
// Override criteria: any topic (i.e., key in the map) will override the default topic score parameters for that topic and
// the corresponding value in the map will be used instead of the default value.
// If you don't want to override topic score params for a given topic, simply don't include that topic in the map.
// If the map is nil, the default topic score parameters are used for all topics.
TopicScoreParams map[channels.Topic]*pubsub.TopicScoreParams

// AppSpecificScoreParams is a function that returns the application specific score parameters for a given peer.
// Override criteria: if the function is not nil, it will override the default application specific score parameters.
// If the function is nil, the default application specific score parameters are used.
AppSpecificScoreParams func(peer.ID) float64

// DecayInterval is the interval over which we decay the effect of past behavior. So that
// a good or bad behavior will not have a permanent effect on the penalty. It is also interval
// that GossipSub refreshes the scores of all peers.
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
// Override criteria: if the value is not zero, it will override the default decay interval.
// If the value is zero, the default decay interval is used.
DecayInterval time.Duration
}

// PeerScoringConfigNoOverride is a default peer scoring configuration for a GossipSub pubsub system.
// It is set to nil, which means that no override is done to the default peer scoring configuration.
// It is the recommended way to use the default peer scoring configuration.
var PeerScoringConfigNoOverride = (*PeerScoringConfigOverride)(nil)
13 changes: 7 additions & 6 deletions network/p2p/connection/connection_gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,19 +396,20 @@ func TestConnectionGater_Disallow_Integration(t *testing.T) {
func ensureCommunicationSilenceAmongGroups(t *testing.T, ctx context.Context, sporkId flow.Identifier, groupA []p2p.LibP2PNode, groupB []p2p.LibP2PNode) {
// ensures no connection, unicast, or pubsub going to the disallow-listed nodes
p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, groupA, groupB)
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, groupA, groupB, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
return unittest.ProposalFixture(), blockTopic

blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, groupA, groupB, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
p2pfixtures.EnsureNoStreamCreationBetweenGroups(t, ctx, groupA, groupB)
}

// ensureCommunicationOverAllProtocols ensures that all nodes are connected to each other, and they can exchange messages over the pubsub and unicast.
func ensureCommunicationOverAllProtocols(t *testing.T, ctx context.Context, sporkId flow.Identifier, nodes []p2p.LibP2PNode, inbounds []chan string) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, func() (interface{}, channels.Topic) {
blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId)
return unittest.ProposalFixture(), blockTopic
p2ptest.EnsurePubsubMessageExchange(t, ctx, nodes, blockTopic, 1, func() interface{} {
return unittest.ProposalFixture()
})
p2pfixtures.EnsureMessageExchangeOverUnicast(t, ctx, nodes, inbounds, p2pfixtures.LongStringMessageFactoryFixture(t))
}
2 changes: 1 addition & 1 deletion network/p2p/connection/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PeerManager struct {
// and it uses the connector to actually connect or disconnect from peers.
func NewPeerManager(logger zerolog.Logger, updateInterval time.Duration, connector p2p.PeerUpdater) *PeerManager {
pm := &PeerManager{
logger: logger,
logger: logger.With().Str("component", "peer-manager").Logger(),
connector: connector,
peerRequestQ: make(chan struct{}, 1),
peerUpdateInterval: updateInterval,
Expand Down
22 changes: 4 additions & 18 deletions network/p2p/mock/gossip_sub_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions network/p2p/mock/node_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading