diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 55fd48ace8e..e87f1e6327d 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -802,6 +802,7 @@ func (builder *FlowAccessNodeBuilder) initNetwork(nodeID module.Local, IdentityProvider: builder.IdentityProvider, ReceiveCache: receiveCache, ConduitFactory: conduit.NewDefaultConduitFactory(), + SporkId: builder.SporkID, AlspCfg: &alspmgr.MisbehaviorReportManagerConfig{ Logger: builder.Logger, SpamRecordCacheSize: builder.FlowConfig.NetworkConfig.AlspConfig.SpamRecordCacheSize, @@ -1404,7 +1405,7 @@ func (builder *FlowAccessNodeBuilder) initMiddleware(nodeID flow.Identifier, Libp2pNode: libp2pNode, FlowId: nodeID, BitSwapMetrics: builder.Metrics.Bitswap, - RootBlockID: builder.SporkID, + SporkId: builder.SporkID, UnicastMessageTimeout: middleware.DefaultUnicastTimeout, IdTranslator: builder.IDTranslator, Codec: builder.CodecFactory(), diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index a8e7abdd4b6..1e294e70d01 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -493,6 +493,7 @@ func (builder *ObserverServiceBuilder) initNetwork(nodeID module.Local, IdentityProvider: builder.IdentityProvider, ReceiveCache: receiveCache, ConduitFactory: conduit.NewDefaultConduitFactory(), + SporkId: builder.SporkID, AlspCfg: &alspmgr.MisbehaviorReportManagerConfig{ Logger: builder.Logger, SpamRecordCacheSize: builder.FlowConfig.NetworkConfig.AlspConfig.SpamRecordCacheSize, @@ -1031,7 +1032,7 @@ func (builder *ObserverServiceBuilder) initMiddleware(nodeID flow.Identifier, Libp2pNode: libp2pNode, FlowId: nodeID, BitSwapMetrics: builder.Metrics.Bitswap, - RootBlockID: builder.SporkID, + SporkId: builder.SporkID, UnicastMessageTimeout: middleware.DefaultUnicastTimeout, IdTranslator: builder.IDTranslator, Codec: builder.CodecFactory(), diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 07e2d63011e..61d9680f524 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -440,7 +440,7 @@ func (fnb *FlowNodeBuilder) InitFlowNetworkWithConduitFactory( Libp2pNode: fnb.LibP2PNode, FlowId: fnb.Me.NodeID(), BitSwapMetrics: fnb.Metrics.Bitswap, - RootBlockID: fnb.SporkID, + SporkId: fnb.SporkID, UnicastMessageTimeout: fnb.FlowConfig.NetworkConfig.UnicastMessageTimeout, IdTranslator: fnb.IDTranslator, Codec: fnb.CodecFactory(), @@ -465,6 +465,7 @@ func (fnb *FlowNodeBuilder) InitFlowNetworkWithConduitFactory( Logger: fnb.Logger, Codec: fnb.CodecFactory(), Me: fnb.Me, + SporkId: fnb.SporkID, MiddlewareFactory: func() (network.Middleware, error) { return fnb.Middleware, nil }, Topology: topology.NewFullyConnectedTopology(), SubscriptionManager: subscriptionManager, diff --git a/follower/follower_builder.go b/follower/follower_builder.go index 27e69ce039c..4b561916fe4 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -380,6 +380,7 @@ func (builder *FollowerServiceBuilder) initNetwork(nodeID module.Local, IdentityProvider: builder.IdentityProvider, ReceiveCache: receiveCache, ConduitFactory: conduit.NewDefaultConduitFactory(), + SporkId: builder.SporkID, AlspCfg: &alspmgr.MisbehaviorReportManagerConfig{ Logger: builder.Logger, SpamRecordCacheSize: builder.FlowConfig.NetworkConfig.AlspConfig.SpamRecordCacheSize, @@ -758,7 +759,7 @@ func (builder *FollowerServiceBuilder) initMiddleware(nodeID flow.Identifier, Libp2pNode: libp2pNode, FlowId: nodeID, BitSwapMetrics: builder.Metrics.Bitswap, - RootBlockID: builder.SporkID, + SporkId: builder.SporkID, UnicastMessageTimeout: middleware.DefaultUnicastTimeout, IdTranslator: builder.IDTranslator, Codec: builder.CodecFactory(), diff --git a/insecure/fixtures.go b/insecure/fixtures.go index 3abe3392f8c..6292a7ac316 100644 --- a/insecure/fixtures.go +++ b/insecure/fixtures.go @@ -11,6 +11,7 @@ import ( "github.com/onflow/flow-go/model/libp2p/message" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" + flownetmsg "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/utils/unittest" ) @@ -40,7 +41,7 @@ func EgressMessageFixture(t *testing.T, codec network.Codec, protocol Protocol, // encodes event to create payload payload, err := codec.Encode(content) require.NoError(t, err) - eventIDHash, err := network.EventId(channel, payload) + eventIDHash, err := flownetmsg.EventId(channel, payload) require.NoError(t, err) eventID := flow.HashToID(eventIDHash) diff --git a/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go b/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go index 8fe78d0f6df..882fda56750 100644 --- a/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go +++ b/insecure/integration/functional/test/gossipsub/rpc_inspector/validation_inspector_test.go @@ -1228,9 +1228,18 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) { // now we expect the detection and mitigation to kick in and the victim node to disconnect from the spammer node. // so the spammer and victim nodes should not be able to exchange messages on the topic. - p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{victimNode}, []p2p.LibP2PNode{spammer.SpammerNode}, blockTopic, 1, func() interface{} { - return unittest.ProposalFixture() - }) + p2ptest.EnsureNoPubsubExchangeBetweenGroups( + t, + ctx, + []p2p.LibP2PNode{victimNode}, + flow.IdentifierList{victimId.NodeID}, + []p2p.LibP2PNode{spammer.SpammerNode}, + flow.IdentifierList{spammer.SpammerId.NodeID}, + blockTopic, + 1, + func() interface{} { + return unittest.ProposalFixture() + }) } // mockDistributorReadyDoneAware mocks the Ready and Done methods of the distributor to return a channel that is already closed, diff --git a/insecure/integration/functional/test/gossipsub/scoring/ihave_spam_test.go b/insecure/integration/functional/test/gossipsub/scoring/ihave_spam_test.go index d9ba2cfcf24..ab484e8002d 100644 --- a/insecure/integration/functional/test/gossipsub/scoring/ihave_spam_test.go +++ b/insecure/integration/functional/test/gossipsub/scoring/ihave_spam_test.go @@ -302,9 +302,18 @@ func TestGossipSubIHaveBrokenPromises_Above_Threshold(t *testing.T) { require.Lessf(t, spammerScore, scoring.DefaultGraylistThreshold, "sanity check failed, the score of the spammer node must be less than graylist threshold: %f, actual: %f", scoring.DefaultGraylistThreshold, spammerScore) // since the spammer score is below the gossip, graylist and publish thresholds, it should not be able to exchange messages with victim anymore. - p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{spammer.SpammerNode}, []p2p.LibP2PNode{victimNode}, blockTopic, 1, func() interface{} { - return unittest.ProposalFixture() - }) + p2ptest.EnsureNoPubsubExchangeBetweenGroups( + t, + ctx, + []p2p.LibP2PNode{spammer.SpammerNode}, + flow.IdentifierList{spammer.SpammerId.NodeID}, + []p2p.LibP2PNode{victimNode}, + flow.IdentifierList{victimIdentity.NodeID}, + blockTopic, + 1, + func() interface{} { + return unittest.ProposalFixture() + }) } // spamIHaveBrokenPromises is a test utility function that is exclusive for the TestGossipSubIHaveBrokenPromises tests. diff --git a/insecure/integration/functional/test/gossipsub/scoring/scoring_test.go b/insecure/integration/functional/test/gossipsub/scoring/scoring_test.go index 910063110e9..e5920d4940d 100644 --- a/insecure/integration/functional/test/gossipsub/scoring/scoring_test.go +++ b/insecure/integration/functional/test/gossipsub/scoring/scoring_test.go @@ -173,9 +173,18 @@ func testGossipSubInvalidMessageDeliveryScoring(t *testing.T, spamMsgFactory fun // ensure that the topic snapshot of the spammer contains a record of at least (60%) of the spam messages sent. The 60% is to account for the messages that were delivered before the score was updated, after the spammer is PRUNED, as well as to account for decay. require.True(t, blkTopicSnapshot.InvalidMessageDeliveries > 0.6*float64(totalSpamMessages), "invalid message deliveries must be greater than %f. invalid message deliveries: %f", 0.9*float64(totalSpamMessages), blkTopicSnapshot.InvalidMessageDeliveries) - p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{victimNode}, []p2p.LibP2PNode{spammer.SpammerNode}, blockTopic, 1, func() interface{} { - return unittest.ProposalFixture() - }) + p2ptest.EnsureNoPubsubExchangeBetweenGroups( + t, + ctx, + []p2p.LibP2PNode{victimNode}, + flow.IdentifierList{victimIdentity.NodeID}, + []p2p.LibP2PNode{spammer.SpammerNode}, + flow.IdentifierList{spammer.SpammerId.NodeID}, + blockTopic, + 1, + func() interface{} { + return unittest.ProposalFixture() + }) } // TestGossipSubMeshDeliveryScoring_UnderDelivery_SingleTopic tests that when a peer is under-performing in a topic mesh, its score is (slightly) penalized. @@ -474,7 +483,7 @@ func TestGossipSubMeshDeliveryScoring_Replay_Will_Not_Counted(t *testing.T) { proposalList[i] = unittest.ProposalFixture() } i := -1 - p2ptest.EnsurePubsubMessageExchangeFromNode(t, ctx, replayingNode, thisNode, blockTopic, len(proposalList), func() interface{} { + p2ptest.EnsurePubsubMessageExchangeFromNode(t, ctx, replayingNode, thisNode, thisId.NodeID, blockTopic, len(proposalList), func() interface{} { i += 1 return proposalList[i] }) @@ -500,10 +509,18 @@ func TestGossipSubMeshDeliveryScoring_Replay_Will_Not_Counted(t *testing.T) { // now the replaying node acts maliciously and just replays the same messages again. i = -1 - p2ptest.EnsureNoPubsubMessageExchange(t, ctx, []p2p.LibP2PNode{replayingNode}, []p2p.LibP2PNode{thisNode}, blockTopic, len(proposalList), func() interface{} { - i += 1 - return proposalList[i] - }) + p2ptest.EnsureNoPubsubMessageExchange( + t, + ctx, + []p2p.LibP2PNode{replayingNode}, + []p2p.LibP2PNode{thisNode}, + flow.IdentifierList{thisId.NodeID}, + blockTopic, + len(proposalList), + func() interface{} { + i += 1 + return proposalList[i] + }) // since the last decay interval, the replaying node has not delivered anything new, so its score should be penalized for under-performing. require.Eventually(t, func() bool { diff --git a/insecure/orchestrator/network.go b/insecure/orchestrator/network.go index a41a4781c2c..11e24874e79 100644 --- a/insecure/orchestrator/network.go +++ b/insecure/orchestrator/network.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" + flownetmsg "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/utils/logging" ) @@ -159,7 +160,7 @@ func (on *Network) processEgressMessage(message *insecure.EgressMessage) error { channel := channels.Channel(message.ChannelID) - egressEventIDHash, err := network.EventId(channel, message.Payload) + egressEventIDHash, err := flownetmsg.EventId(channel, message.Payload) if err != nil { return fmt.Errorf("could not create egress event ID: %w", err) } @@ -205,7 +206,7 @@ func (on *Network) processIngressMessage(message *insecure.IngressMessage) error defer on.orchestratorMutex.Unlock() channel := channels.Channel(message.ChannelID) - ingressEventIDHash, err := network.EventId(channel, message.Payload) + ingressEventIDHash, err := flownetmsg.EventId(channel, message.Payload) if err != nil { return fmt.Errorf("could not create ingress event ID: %w", err) } diff --git a/network/alsp/manager/manager_test.go b/network/alsp/manager/manager_test.go index 213d05fd1f4..a02bc5b3002 100644 --- a/network/alsp/manager/manager_test.go +++ b/network/alsp/manager/manager_test.go @@ -63,7 +63,7 @@ func TestNetworkPassesReportedMisbehavior(t *testing.T) { testutils.MiddlewareConfigFixture(t, sporkId), mocknetwork.NewViolationsConsumer(t)) - networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0]) + networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, sporkId, mws[0]) net, err := p2p.NewNetwork(networkCfg, p2p.WithAlspManager(misbehaviorReportManger)) require.NoError(t, err) @@ -126,7 +126,7 @@ func TestHandleReportedMisbehavior_Cache_Integration(t *testing.T) { nodes, testutils.MiddlewareConfigFixture(t, sporkId), mocknetwork.NewViolationsConsumer(t)) - networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0], p2p.WithAlspConfig(cfg)) + networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, sporkId, mws[0], p2p.WithAlspConfig(cfg)) net, err := p2p.NewNetwork(networkCfg) require.NoError(t, err) @@ -230,7 +230,7 @@ func TestHandleReportedMisbehavior_And_DisallowListing_Integration(t *testing.T) nodes, testutils.MiddlewareConfigFixture(t, sporkId), mocknetwork.NewViolationsConsumer(t)) - networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0], p2p.WithAlspConfig(cfg)) + networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, sporkId, mws[0], p2p.WithAlspConfig(cfg)) victimNetwork, err := p2p.NewNetwork(networkCfg) require.NoError(t, err) @@ -278,7 +278,7 @@ func TestHandleReportedMisbehavior_And_DisallowListing_Integration(t *testing.T) unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "not all misbehavior reports have been processed") // ensures that the spammer is disallow-listed by the victim - p2ptest.RequireEventuallyNotConnected(t, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[spammerIndex]}, 100*time.Millisecond, 3*time.Second) + p2ptest.RequireEventuallyNotConnected(t, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[spammerIndex]}, 100*time.Millisecond, 5*time.Second) // despite disallow-listing spammer, ensure that (victim and honest) and (honest and spammer) are still connected. p2ptest.RequireConnectedEventually(t, []p2p.LibP2PNode{nodes[spammerIndex], nodes[honestIndex]}, 1*time.Millisecond, 100*time.Millisecond) @@ -323,7 +323,7 @@ func TestHandleReportedMisbehavior_And_DisallowListing_RepeatOffender_Integratio ids, nodes := testutils.LibP2PNodeForMiddlewareFixture(t, sporkId, 3, p2ptest.WithPeerManagerEnabled(p2ptest.PeerManagerConfigFixture(p2ptest.WithZeroJitterAndZeroBackoff(t)), nil)) mws, _ := testutils.MiddlewareFixtures(t, ids, nodes, testutils.MiddlewareConfigFixture(t, sporkId), mocknetwork.NewViolationsConsumer(t)) - networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0], p2p.WithAlspConfig(cfg)) + networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, sporkId, mws[0], p2p.WithAlspConfig(cfg)) victimNetwork, err := p2p.NewNetwork(networkCfg) require.NoError(t, err) @@ -482,6 +482,7 @@ func TestHandleReportedMisbehavior_And_SlashingViolationsConsumer_Integration(t t, *ids[0], ids, + sporkId, mws[0], p2p.WithAlspConfig(managerCfgFixture(t))) victimNetwork, err := p2p.NewNetwork(networkCfg) @@ -575,7 +576,7 @@ func TestMisbehaviorReportMetrics(t *testing.T) { sporkId := unittest.IdentifierFixture() ids, nodes := testutils.LibP2PNodeForMiddlewareFixture(t, sporkId, 1) mws, _ := testutils.MiddlewareFixtures(t, ids, nodes, testutils.MiddlewareConfigFixture(t, sporkId), mocknetwork.NewViolationsConsumer(t)) - networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0], p2p.WithAlspConfig(cfg)) + networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, sporkId, mws[0], p2p.WithAlspConfig(cfg)) net, err := p2p.NewNetwork(networkCfg) require.NoError(t, err) diff --git a/network/cache/rcvcache_test.go b/network/cache/rcvcache_test.go index 32551e1264f..28c65c52f72 100644 --- a/network/cache/rcvcache_test.go +++ b/network/cache/rcvcache_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/onflow/flow-go/network" + "github.com/onflow/flow-go/network/message" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -43,20 +43,20 @@ func (r *ReceiveCacheTestSuite) SetupTest() { // TestSingleElementAdd adds a single element to the cache and verifies its existence. func (r *ReceiveCacheTestSuite) TestSingleElementAdd() { - eventID, err := network.EventId(channels.Channel("0"), []byte("event-1")) + eventID, err := message.EventId(channels.Channel("0"), []byte("event-1")) require.NoError(r.T(), err) assert.True(r.Suite.T(), r.c.Add(eventID)) assert.False(r.Suite.T(), r.c.Add(eventID)) // same channel but different event should be treated as unseen - eventID2, err := network.EventId(channels.Channel("0"), []byte("event-2")) + eventID2, err := message.EventId(channels.Channel("0"), []byte("event-2")) require.NoError(r.T(), err) assert.True(r.Suite.T(), r.c.Add(eventID2)) assert.False(r.Suite.T(), r.c.Add(eventID2)) // same event but different channels should be treated as unseen - eventID3, err := network.EventId(channels.Channel("1"), []byte("event-2")) + eventID3, err := message.EventId(channels.Channel("1"), []byte("event-2")) require.NoError(r.T(), err) assert.True(r.Suite.T(), r.c.Add(eventID3)) assert.False(r.Suite.T(), r.c.Add(eventID3)) @@ -64,7 +64,7 @@ func (r *ReceiveCacheTestSuite) TestSingleElementAdd() { // TestNoneExistence evaluates the correctness of cache operation against non-existing element func (r *ReceiveCacheTestSuite) TestNoneExistence() { - eventID, err := network.EventId(channels.Channel("1"), []byte("non-existing event")) + eventID, err := message.EventId(channels.Channel("1"), []byte("non-existing event")) require.NoError(r.T(), err) // adding new event to cache should return true @@ -76,7 +76,7 @@ func (r *ReceiveCacheTestSuite) TestMultipleElementAdd() { // creates and populates slice of 10 events eventIDs := make([]hash.Hash, 0) for i := 0; i < r.size; i++ { - eventID, err := network.EventId(channels.Channel("1"), []byte(fmt.Sprintf("event-%d", i))) + eventID, err := message.EventId(channels.Channel("1"), []byte(fmt.Sprintf("event-%d", i))) require.NoError(r.T(), err) eventIDs = append(eventIDs, eventID) @@ -114,7 +114,7 @@ func (r *ReceiveCacheTestSuite) TestLRU() { eventIDs := make([]hash.Hash, 0) total := r.size + 1 for i := 0; i < total; i++ { - eventID, err := network.EventId(channels.Channel("1"), []byte(fmt.Sprintf("event-%d", i))) + eventID, err := message.EventId(channels.Channel("1"), []byte(fmt.Sprintf("event-%d", i))) require.NoError(r.T(), err) eventIDs = append(eventIDs, eventID) diff --git a/network/internal/testutils/testUtil.go b/network/internal/testutils/testUtil.go index 927e1666076..c9fe52f2f4e 100644 --- a/network/internal/testutils/testUtil.go +++ b/network/internal/testutils/testUtil.go @@ -154,7 +154,7 @@ func MiddlewareConfigFixture(t *testing.T, sporkId flow.Identifier) *middleware. return &middleware.Config{ Logger: unittest.Logger(), BitSwapMetrics: metrics.NewNoopCollector(), - RootBlockID: sporkId, + SporkId: sporkId, UnicastMessageTimeout: middleware.DefaultUnicastTimeout, Codec: unittest.NetworkCodec(), } @@ -191,6 +191,7 @@ func MiddlewareFixtures(t *testing.T, identities flow.IdentityList, libP2PNodes // NetworksFixture generates the network for the given middlewares func NetworksFixture(t *testing.T, + sporkId flow.Identifier, ids flow.IdentityList, mws []network.Middleware) []network.Network { @@ -199,7 +200,7 @@ func NetworksFixture(t *testing.T, for i := 0; i < count; i++ { - params := NetworkConfigFixture(t, *ids[i], ids, mws[i]) + params := NetworkConfigFixture(t, *ids[i], ids, sporkId, mws[i]) net, err := p2p.NewNetwork(params) require.NoError(t, err) @@ -213,6 +214,7 @@ func NetworkConfigFixture( t *testing.T, myId flow.Identity, allIds flow.IdentityList, + sporkId flow.Identifier, mw network.Middleware, opts ...p2p.NetworkConfigOption) *p2p.NetworkConfig { @@ -240,6 +242,7 @@ func NetworkConfigFixture( IdentityProvider: id.NewFixedIdentityProvider(allIds), ReceiveCache: receiveCache, ConduitFactory: conduit.NewDefaultConduitFactory(), + SporkId: sporkId, AlspCfg: &alspmgr.MisbehaviorReportManagerConfig{ Logger: unittest.Logger(), SpamRecordCacheSize: defaultFlowConfig.NetworkConfig.AlspConfig.SpamRecordCacheSize, diff --git a/network/message/message_scope.go b/network/message/message_scope.go new file mode 100644 index 00000000000..fa4b0d667ef --- /dev/null +++ b/network/message/message_scope.go @@ -0,0 +1,205 @@ +package message + +import ( + "fmt" + "strings" + + "github.com/onflow/flow-go/crypto/hash" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/network/channels" +) + +const ( + // eventIDPackingPrefix is used as a salt to generate payload hash for messages. + eventIDPackingPrefix = "libp2ppacking" +) + +// EventId computes the event ID for a given channel and payload (i.e., the hash of the payload and channel). +// All errors returned by this function are benign and should not cause the node to crash. +// It errors if the hash function fails to hash the payload and channel. +func EventId(channel channels.Channel, payload []byte) (hash.Hash, error) { + // use a hash with an engine-specific salt to get the payload hash + h := hash.NewSHA3_384() + _, err := h.Write([]byte(eventIDPackingPrefix + channel)) + if err != nil { + return nil, fmt.Errorf("could not hash channel as salt: %w", err) + } + + _, err = h.Write(payload) + if err != nil { + return nil, fmt.Errorf("could not hash event: %w", err) + } + + return h.SumHash(), nil +} + +// MessageType returns the type of the message payload. +func MessageType(decodedPayload interface{}) string { + return strings.TrimLeft(fmt.Sprintf("%T", decodedPayload), "*") +} + +// IncomingMessageScope captures the context around an incoming message that is received by the network layer. +type IncomingMessageScope struct { + originId flow.Identifier // the origin node ID. + targetIds flow.IdentifierList // the target node IDs (i.e., intended recipients). + eventId hash.Hash // hash of the payload and channel. + msg *Message // the raw message received. + decodedPayload interface{} // decoded payload of the message. + protocol ProtocolType // the type of protocol used to receive the message. +} + +// NewIncomingScope creates a new incoming message scope. +// All errors returned by this function are benign and should not cause the node to crash, especially that it is not +// safe to crash the node when receiving a message. +// It errors if event id (i.e., hash of the payload and channel) cannot be computed, or if it fails to +// convert the target IDs from bytes slice to a flow.IdentifierList. +func NewIncomingScope(originId flow.Identifier, protocol ProtocolType, msg *Message, decodedPayload interface{}) (*IncomingMessageScope, error) { + eventId, err := EventId(channels.Channel(msg.ChannelID), msg.Payload) + if err != nil { + return nil, fmt.Errorf("could not compute event id: %w", err) + } + + targetIds, err := flow.ByteSlicesToIds(msg.TargetIDs) + if err != nil { + return nil, fmt.Errorf("could not convert target ids: %w", err) + } + return &IncomingMessageScope{ + eventId: eventId, + originId: originId, + msg: msg, + decodedPayload: decodedPayload, + protocol: protocol, + targetIds: targetIds, + }, nil +} + +func (m IncomingMessageScope) OriginId() flow.Identifier { + return m.originId +} + +func (m IncomingMessageScope) Proto() *Message { + return m.msg +} + +func (m IncomingMessageScope) DecodedPayload() interface{} { + return m.decodedPayload +} + +func (m IncomingMessageScope) Protocol() ProtocolType { + return m.protocol +} + +func (m IncomingMessageScope) Channel() channels.Channel { + return channels.Channel(m.msg.ChannelID) +} + +func (m IncomingMessageScope) Size() int { + return m.msg.Size() +} + +func (m IncomingMessageScope) TargetIDs() flow.IdentifierList { + return m.targetIds +} + +func (m IncomingMessageScope) EventID() []byte { + return m.eventId[:] +} + +func (m IncomingMessageScope) PayloadType() string { + return MessageType(m.decodedPayload) +} + +// OutgoingMessageScope captures the context around an outgoing message that is about to be sent. +type OutgoingMessageScope struct { + targetIds flow.IdentifierList // the target node IDs. + topic channels.Topic // the topic, i.e., channel-id/spork-id. + payload interface{} // the payload to be sent. + encoder func(interface{}) ([]byte, error) // the encoder to encode the payload. + msg *Message // raw proto message sent on wire. + protocol ProtocolType // the type of protocol used to send the message. +} + +// NewOutgoingScope creates a new outgoing message scope. +// All errors returned by this function are benign and should not cause the node to crash. +// It errors if the encoder fails to encode the payload into a protobuf message, or +// if the number of target IDs does not match the protocol type (i.e., unicast messages +// should have exactly one target ID, while pubsub messages should have at least one target ID). +func NewOutgoingScope( + targetIds flow.IdentifierList, + topic channels.Topic, + payload interface{}, + encoder func(interface{}) ([]byte, error), + protocolType ProtocolType) (*OutgoingMessageScope, error) { + scope := &OutgoingMessageScope{ + targetIds: targetIds, + topic: topic, + payload: payload, + encoder: encoder, + protocol: protocolType, + } + + if protocolType == ProtocolTypeUnicast { + // for unicast messages, we should have exactly one target. + if len(targetIds) != 1 { + return nil, fmt.Errorf("expected exactly one target id for unicast message, got: %d", len(targetIds)) + } + } + if protocolType == ProtocolTypePubSub { + // for pubsub messages, we should have at least one target. + if len(targetIds) == 0 { + return nil, fmt.Errorf("expected at least one target id for pubsub message, got: %d", len(targetIds)) + } + } + + msg, err := scope.buildMessage() + if err != nil { + return nil, fmt.Errorf("could not build message: %w", err) + } + scope.msg = msg + return scope, nil +} + +func (o OutgoingMessageScope) TargetIds() flow.IdentifierList { + return o.targetIds +} + +func (o OutgoingMessageScope) Size() int { + return o.msg.Size() +} + +func (o OutgoingMessageScope) PayloadType() string { + return MessageType(o.payload) +} + +func (o OutgoingMessageScope) Topic() channels.Topic { + return o.topic +} + +// buildMessage builds the raw proto message to be sent on the wire. +func (o OutgoingMessageScope) buildMessage() (*Message, error) { + payload, err := o.encoder(o.payload) + if err != nil { + return nil, fmt.Errorf("could not encode payload: %w", err) + } + + emTargets := make([][]byte, 0) + for _, targetId := range o.targetIds { + tempID := targetId // avoid capturing loop variable + emTargets = append(emTargets, tempID[:]) + } + + channel, ok := channels.ChannelFromTopic(o.topic) + if !ok { + return nil, fmt.Errorf("could not convert topic to channel: %s", o.topic) + } + + return &Message{ + TargetIDs: emTargets, + ChannelID: channel.String(), + Payload: payload, + }, nil +} + +func (o OutgoingMessageScope) Proto() *Message { + return o.msg +} diff --git a/network/message_scope.go b/network/message_scope.go index 3db13a1b2bd..4e4ded4b9cc 100644 --- a/network/message_scope.go +++ b/network/message_scope.go @@ -1,201 +1,57 @@ package network import ( - "fmt" - "strings" - - "github.com/onflow/flow-go/crypto/hash" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/message" ) -const ( - // eventIDPackingPrefix is used as a salt to generate payload hash for messages. - eventIDPackingPrefix = "libp2ppacking" -) - -// IncomingMessageScope captures the context around an incoming message that is received by the network layer. -type IncomingMessageScope struct { - originId flow.Identifier // the origin node ID. - targetIds flow.IdentifierList // the target node IDs (i.e., intended recipients). - eventId hash.Hash // hash of the payload and channel. - msg *message.Message // the raw message received. - decodedPayload interface{} // decoded payload of the message. - protocol message.ProtocolType // the type of protocol used to receive the message. -} - -// NewIncomingScope creates a new incoming message scope. -// All errors returned by this function are benign and should not cause the node to crash, especially that it is not -// safe to crash the node when receiving a message. -// It errors if event id (i.e., hash of the payload and channel) cannot be computed, or if it fails to -// convert the target IDs from bytes slice to a flow.IdentifierList. -func NewIncomingScope(originId flow.Identifier, protocol message.ProtocolType, msg *message.Message, decodedPayload interface{}) (*IncomingMessageScope, error) { - eventId, err := EventId(channels.Channel(msg.ChannelID), msg.Payload) - if err != nil { - return nil, fmt.Errorf("could not compute event id: %w", err) - } - - targetIds, err := flow.ByteSlicesToIds(msg.TargetIDs) - if err != nil { - return nil, fmt.Errorf("could not convert target ids: %w", err) - } - return &IncomingMessageScope{ - eventId: eventId, - originId: originId, - msg: msg, - decodedPayload: decodedPayload, - protocol: protocol, - targetIds: targetIds, - }, nil -} - -func (m IncomingMessageScope) OriginId() flow.Identifier { - return m.originId -} - -func (m IncomingMessageScope) Proto() *message.Message { - return m.msg -} - -func (m IncomingMessageScope) DecodedPayload() interface{} { - return m.decodedPayload -} - -func (m IncomingMessageScope) Protocol() message.ProtocolType { - return m.protocol -} - -func (m IncomingMessageScope) Channel() channels.Channel { - return channels.Channel(m.msg.ChannelID) -} - -func (m IncomingMessageScope) Size() int { - return m.msg.Size() -} - -func (m IncomingMessageScope) TargetIDs() flow.IdentifierList { - return m.targetIds -} - -func (m IncomingMessageScope) EventID() []byte { - return m.eventId[:] -} - -func (m IncomingMessageScope) PayloadType() string { - return MessageType(m.decodedPayload) -} - -// OutgoingMessageScope captures the context around an outgoing message that is about to be sent. -type OutgoingMessageScope struct { - targetIds flow.IdentifierList // the target node IDs. - channelId channels.Channel // the channel ID. - payload interface{} // the payload to be sent. - encoder func(interface{}) ([]byte, error) // the encoder to encode the payload. - msg *message.Message // raw proto message sent on wire. - protocol message.ProtocolType // the type of protocol used to send the message. -} +// IncomingMessageScope defines the interface for incoming message scopes, i.e., self-contained messages that have been +// received on the wire and are ready to be processed. +type IncomingMessageScope interface { + // OriginId returns the origin node ID. + OriginId() flow.Identifier -// NewOutgoingScope creates a new outgoing message scope. -// All errors returned by this function are benign and should not cause the node to crash. -// It errors if the encoder fails to encode the payload into a protobuf message, or -// if the number of target IDs does not match the protocol type (i.e., unicast messages -// should have exactly one target ID, while pubsub messages should have at least one target ID). -func NewOutgoingScope( - targetIds flow.IdentifierList, - channelId channels.Channel, - payload interface{}, - encoder func(interface{}) ([]byte, error), - protocolType message.ProtocolType) (*OutgoingMessageScope, error) { - scope := &OutgoingMessageScope{ - targetIds: targetIds, - channelId: channelId, - payload: payload, - encoder: encoder, - protocol: protocolType, - } + // Proto returns the raw message received. + Proto() *message.Message - if protocolType == message.ProtocolTypeUnicast { - // for unicast messages, we should have exactly one target. - if len(targetIds) != 1 { - return nil, fmt.Errorf("expected exactly one target id for unicast message, got: %d", len(targetIds)) - } - } - if protocolType == message.ProtocolTypePubSub { - // for pubsub messages, we should have at least one target. - if len(targetIds) == 0 { - return nil, fmt.Errorf("expected at least one target id for pubsub message, got: %d", len(targetIds)) - } - } + // DecodedPayload returns the decoded payload of the message. + DecodedPayload() interface{} - msg, err := scope.buildMessage() - if err != nil { - return nil, fmt.Errorf("could not build message: %w", err) - } - scope.msg = msg - return scope, nil -} + // Protocol returns the type of protocol used to receive the message. + Protocol() message.ProtocolType -func (o OutgoingMessageScope) TargetIds() flow.IdentifierList { - return o.targetIds -} + // Channel returns the channel of the message. + Channel() channels.Channel -func (o OutgoingMessageScope) Size() int { - return o.msg.Size() -} + // Size returns the size of the message. + Size() int -func (o OutgoingMessageScope) PayloadType() string { - return MessageType(o.payload) -} + // TargetIDs returns the target node IDs, i.e., the intended recipients. + TargetIDs() flow.IdentifierList -func (o OutgoingMessageScope) Channel() channels.Channel { - return o.channelId -} + // EventID returns the hash of the payload and channel. + EventID() []byte -// buildMessage builds the raw proto message to be sent on the wire. -func (o OutgoingMessageScope) buildMessage() (*message.Message, error) { - payload, err := o.encoder(o.payload) - if err != nil { - return nil, fmt.Errorf("could not encode payload: %w", err) - } - - emTargets := make([][]byte, 0) - for _, targetId := range o.targetIds { - tempID := targetId // avoid capturing loop variable - emTargets = append(emTargets, tempID[:]) - } - - return &message.Message{ - TargetIDs: emTargets, - ChannelID: o.channelId.String(), - Payload: payload, - }, nil + // PayloadType returns the type of the decoded payload. + PayloadType() string } -func (o OutgoingMessageScope) Proto() *message.Message { - return o.msg -} +// OutgoingMessageScope defines the interface for building outgoing message scopes, i.e., self-contained messages +// that are ready to be sent on the wire. +type OutgoingMessageScope interface { + // TargetIds returns the target node IDs. + TargetIds() flow.IdentifierList -// EventId computes the event ID for a given channel and payload (i.e., the hash of the payload and channel). -// All errors returned by this function are benign and should not cause the node to crash. -// It errors if the hash function fails to hash the payload and channel. -func EventId(channel channels.Channel, payload []byte) (hash.Hash, error) { - // use a hash with an engine-specific salt to get the payload hash - h := hash.NewSHA3_384() - _, err := h.Write([]byte(eventIDPackingPrefix + channel)) - if err != nil { - return nil, fmt.Errorf("could not hash channel as salt: %w", err) - } + // Size returns the size of the message. + Size() int - _, err = h.Write(payload) - if err != nil { - return nil, fmt.Errorf("could not hash event: %w", err) - } + // PayloadType returns the type of the payload to be sent. + PayloadType() string - return h.SumHash(), nil -} + // Topic returns the topic, i.e., channel-id/spork-id. + Topic() channels.Topic -// MessageType returns the type of the message payload. -func MessageType(decodedPayload interface{}) string { - return strings.TrimLeft(fmt.Sprintf("%T", decodedPayload), "*") + // Proto returns the raw proto message sent on the wire. + Proto() *message.Message } diff --git a/network/middleware.go b/network/middleware.go index d8e14ee82c1..a7ac5820b19 100644 --- a/network/middleware.go +++ b/network/middleware.go @@ -32,13 +32,13 @@ type Middleware interface { // Dispatch should be used whenever guaranteed delivery to a specific target is required. Otherwise, Publish is // a more efficient candidate. // All errors returned from this function can be considered benign. - SendDirect(msg *OutgoingMessageScope) error + SendDirect(msg OutgoingMessageScope) error // Publish publishes a message on the channel. It models a distributed broadcast where the message is meant for all or // a many nodes subscribing to the channel. It does not guarantee the delivery though, and operates on a best // effort. // All errors returned from this function can be considered benign. - Publish(msg *OutgoingMessageScope) error + Publish(msg OutgoingMessageScope) error // Subscribe subscribes the middleware to a channel. // No errors are expected during normal operation. @@ -57,8 +57,6 @@ type Middleware interface { // NewPingService creates a new PingService for the given ping protocol ID. NewPingService(pingProtocol protocol.ID, provider PingInfoProvider) PingService - - IsConnected(nodeID flow.Identifier) (bool, error) } // Overlay represents the interface that middleware uses to interact with the @@ -73,7 +71,7 @@ type Overlay interface { // Identity returns the Identity associated with the given peer ID, if it exists Identity(peer.ID) (*flow.Identity, bool) - Receive(*IncomingMessageScope) error + Receive(IncomingMessageScope) error } // Connection represents an interface to read from & write to a connection. diff --git a/network/mocknetwork/incoming_message_scope.go b/network/mocknetwork/incoming_message_scope.go new file mode 100644 index 00000000000..543c9da9796 --- /dev/null +++ b/network/mocknetwork/incoming_message_scope.go @@ -0,0 +1,168 @@ +// Code generated by mockery v2.21.4. DO NOT EDIT. + +package mocknetwork + +import ( + flow "github.com/onflow/flow-go/model/flow" + channels "github.com/onflow/flow-go/network/channels" + + message "github.com/onflow/flow-go/network/message" + + mock "github.com/stretchr/testify/mock" +) + +// IncomingMessageScope is an autogenerated mock type for the IncomingMessageScope type +type IncomingMessageScope struct { + mock.Mock +} + +// Channel provides a mock function with given fields: +func (_m *IncomingMessageScope) Channel() channels.Channel { + ret := _m.Called() + + var r0 channels.Channel + if rf, ok := ret.Get(0).(func() channels.Channel); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(channels.Channel) + } + + return r0 +} + +// DecodedPayload provides a mock function with given fields: +func (_m *IncomingMessageScope) DecodedPayload() interface{} { + ret := _m.Called() + + var r0 interface{} + if rf, ok := ret.Get(0).(func() interface{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + return r0 +} + +// EventID provides a mock function with given fields: +func (_m *IncomingMessageScope) EventID() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// OriginId provides a mock function with given fields: +func (_m *IncomingMessageScope) OriginId() flow.Identifier { + ret := _m.Called() + + var r0 flow.Identifier + if rf, ok := ret.Get(0).(func() flow.Identifier); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(flow.Identifier) + } + } + + return r0 +} + +// PayloadType provides a mock function with given fields: +func (_m *IncomingMessageScope) PayloadType() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Proto provides a mock function with given fields: +func (_m *IncomingMessageScope) Proto() *message.Message { + ret := _m.Called() + + var r0 *message.Message + if rf, ok := ret.Get(0).(func() *message.Message); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*message.Message) + } + } + + return r0 +} + +// Protocol provides a mock function with given fields: +func (_m *IncomingMessageScope) Protocol() message.ProtocolType { + ret := _m.Called() + + var r0 message.ProtocolType + if rf, ok := ret.Get(0).(func() message.ProtocolType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.ProtocolType) + } + + return r0 +} + +// Size provides a mock function with given fields: +func (_m *IncomingMessageScope) Size() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// TargetIDs provides a mock function with given fields: +func (_m *IncomingMessageScope) TargetIDs() flow.IdentifierList { + ret := _m.Called() + + var r0 flow.IdentifierList + if rf, ok := ret.Get(0).(func() flow.IdentifierList); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(flow.IdentifierList) + } + } + + return r0 +} + +type mockConstructorTestingTNewIncomingMessageScope interface { + mock.TestingT + Cleanup(func()) +} + +// NewIncomingMessageScope creates a new instance of IncomingMessageScope. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewIncomingMessageScope(t mockConstructorTestingTNewIncomingMessageScope) *IncomingMessageScope { + mock := &IncomingMessageScope{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/network/mocknetwork/middleware.go b/network/mocknetwork/middleware.go index 18cdaed21b0..2b0f7409df4 100644 --- a/network/mocknetwork/middleware.go +++ b/network/mocknetwork/middleware.go @@ -6,8 +6,6 @@ import ( datastore "github.com/ipfs/go-datastore" channels "github.com/onflow/flow-go/network/channels" - flow "github.com/onflow/flow-go/model/flow" - irrecoverable "github.com/onflow/flow-go/module/irrecoverable" mock "github.com/stretchr/testify/mock" @@ -38,30 +36,6 @@ func (_m *Middleware) Done() <-chan struct{} { return r0 } -// IsConnected provides a mock function with given fields: nodeID -func (_m *Middleware) IsConnected(nodeID flow.Identifier) (bool, error) { - ret := _m.Called(nodeID) - - var r0 bool - var r1 error - if rf, ok := ret.Get(0).(func(flow.Identifier) (bool, error)); ok { - return rf(nodeID) - } - if rf, ok := ret.Get(0).(func(flow.Identifier) bool); ok { - r0 = rf(nodeID) - } else { - r0 = ret.Get(0).(bool) - } - - if rf, ok := ret.Get(1).(func(flow.Identifier) error); ok { - r1 = rf(nodeID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // NewBlobService provides a mock function with given fields: channel, store, opts func (_m *Middleware) NewBlobService(channel channels.Channel, store datastore.Batching, opts ...network.BlobServiceOption) network.BlobService { _va := make([]interface{}, len(opts)) @@ -112,11 +86,11 @@ func (_m *Middleware) OnDisallowListNotification(_a0 *network.DisallowListingUpd } // Publish provides a mock function with given fields: msg -func (_m *Middleware) Publish(msg *network.OutgoingMessageScope) error { +func (_m *Middleware) Publish(msg network.OutgoingMessageScope) error { ret := _m.Called(msg) var r0 error - if rf, ok := ret.Get(0).(func(*network.OutgoingMessageScope) error); ok { + if rf, ok := ret.Get(0).(func(network.OutgoingMessageScope) error); ok { r0 = rf(msg) } else { r0 = ret.Error(0) @@ -142,11 +116,11 @@ func (_m *Middleware) Ready() <-chan struct{} { } // SendDirect provides a mock function with given fields: msg -func (_m *Middleware) SendDirect(msg *network.OutgoingMessageScope) error { +func (_m *Middleware) SendDirect(msg network.OutgoingMessageScope) error { ret := _m.Called(msg) var r0 error - if rf, ok := ret.Get(0).(func(*network.OutgoingMessageScope) error); ok { + if rf, ok := ret.Get(0).(func(network.OutgoingMessageScope) error); ok { r0 = rf(msg) } else { r0 = ret.Error(0) diff --git a/network/mocknetwork/outgoing_message_scope.go b/network/mocknetwork/outgoing_message_scope.go new file mode 100644 index 00000000000..f630c94e01a --- /dev/null +++ b/network/mocknetwork/outgoing_message_scope.go @@ -0,0 +1,106 @@ +// Code generated by mockery v2.21.4. DO NOT EDIT. + +package mocknetwork + +import ( + flow "github.com/onflow/flow-go/model/flow" + channels "github.com/onflow/flow-go/network/channels" + + message "github.com/onflow/flow-go/network/message" + + mock "github.com/stretchr/testify/mock" +) + +// OutgoingMessageScope is an autogenerated mock type for the OutgoingMessageScope type +type OutgoingMessageScope struct { + mock.Mock +} + +// PayloadType provides a mock function with given fields: +func (_m *OutgoingMessageScope) PayloadType() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Proto provides a mock function with given fields: +func (_m *OutgoingMessageScope) Proto() *message.Message { + ret := _m.Called() + + var r0 *message.Message + if rf, ok := ret.Get(0).(func() *message.Message); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*message.Message) + } + } + + return r0 +} + +// Size provides a mock function with given fields: +func (_m *OutgoingMessageScope) Size() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// TargetIds provides a mock function with given fields: +func (_m *OutgoingMessageScope) TargetIds() flow.IdentifierList { + ret := _m.Called() + + var r0 flow.IdentifierList + if rf, ok := ret.Get(0).(func() flow.IdentifierList); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(flow.IdentifierList) + } + } + + return r0 +} + +// Topic provides a mock function with given fields: +func (_m *OutgoingMessageScope) Topic() channels.Topic { + ret := _m.Called() + + var r0 channels.Topic + if rf, ok := ret.Get(0).(func() channels.Topic); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(channels.Topic) + } + + return r0 +} + +type mockConstructorTestingTNewOutgoingMessageScope interface { + mock.TestingT + Cleanup(func()) +} + +// NewOutgoingMessageScope creates a new instance of OutgoingMessageScope. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewOutgoingMessageScope(t mockConstructorTestingTNewOutgoingMessageScope) *OutgoingMessageScope { + mock := &OutgoingMessageScope{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/network/mocknetwork/overlay.go b/network/mocknetwork/overlay.go index e36869114c1..7eab360e012 100644 --- a/network/mocknetwork/overlay.go +++ b/network/mocknetwork/overlay.go @@ -59,11 +59,11 @@ func (_m *Overlay) Identity(_a0 peer.ID) (*flow.Identity, bool) { } // Receive provides a mock function with given fields: _a0 -func (_m *Overlay) Receive(_a0 *network.IncomingMessageScope) error { +func (_m *Overlay) Receive(_a0 network.IncomingMessageScope) error { ret := _m.Called(_a0) var r0 error - if rf, ok := ret.Get(0).(func(*network.IncomingMessageScope) error); ok { + if rf, ok := ret.Get(0).(func(network.IncomingMessageScope) error); ok { r0 = rf(_a0) } else { r0 = ret.Error(0) diff --git a/network/p2p/connection/connection_gater_test.go b/network/p2p/connection/connection_gater_test.go index 4bd2314e509..7cec31f3d79 100644 --- a/network/p2p/connection/connection_gater_test.go +++ b/network/p2p/connection/connection_gater_test.go @@ -235,6 +235,7 @@ func TestConnectionGater_InterceptUpgrade(t *testing.T) { count := 5 nodes := make([]p2p.LibP2PNode, 0, count) inbounds := make([]chan string, 0, count) + identities := make(flow.IdentityList, 0, count) disallowedPeerIds := unittest.NewProtectedMap[peer.ID, struct{}]() allPeerIds := make(peer.IDSlice, 0, count) @@ -266,6 +267,7 @@ func TestConnectionGater_InterceptUpgrade(t *testing.T) { p2ptest.WithConnectionGater(connectionGater)) idProvider.On("ByPeerID", node.Host().ID()).Return(&id, true).Maybe() nodes = append(nodes, node) + identities = append(identities, &id) allPeerIds = append(allPeerIds, node.Host().ID()) inbounds = append(inbounds, inbound) } @@ -301,7 +303,7 @@ func TestConnectionGater_InterceptUpgrade(t *testing.T) { require.False(t, disallowedPeerIds.Has(remote)) }).Return(true, control.DisconnectReason(0)) - ensureCommunicationSilenceAmongGroups(t, ctx, sporkId, nodes[:1], nodes[1:]) + ensureCommunicationSilenceAmongGroups(t, ctx, sporkId, nodes[:1], identities[:1].NodeIDs(), nodes[1:], identities[1:].NodeIDs()) ensureCommunicationOverAllProtocols(t, ctx, sporkId, nodes[1:], inbounds[1:]) } @@ -380,28 +382,44 @@ func TestConnectionGater_Disallow_Integration(t *testing.T) { // let peer manager prune the connections to the disallow-listed node. time.Sleep(1 * time.Second) // ensures no connection, unicast, or pubsub going to or coming from the disallow-listed node. - ensureCommunicationSilenceAmongGroups(t, ctx, sporkId, nodes[:count-1], nodes[count-1:]) + ensureCommunicationSilenceAmongGroups(t, ctx, sporkId, nodes[:count-1], ids[:count-1].NodeIDs(), nodes[count-1:], ids[count-1:].NodeIDs()) // now we add another node (the second last node) to the disallowed list. disallowedList.Add(ids[len(ids)-2], struct{}{}) // let peer manager prune the connections to the disallow-listed node. time.Sleep(1 * time.Second) // ensures no connection, unicast, or pubsub going to and coming from the disallow-listed nodes. - ensureCommunicationSilenceAmongGroups(t, ctx, sporkId, nodes[:count-2], nodes[count-2:]) + ensureCommunicationSilenceAmongGroups(t, ctx, sporkId, nodes[:count-2], ids[:count-2].NodeIDs(), nodes[count-2:], ids[count-2:].NodeIDs()) // ensures that all nodes are other non-disallow-listed nodes can exchange messages over the pubsub and unicast. ensureCommunicationOverAllProtocols(t, ctx, sporkId, nodes[:count-2], inbounds[:count-2]) } // ensureCommunicationSilenceAmongGroups ensures no connection, unicast, or pubsub going to or coming from between the two groups of nodes. -func ensureCommunicationSilenceAmongGroups(t *testing.T, ctx context.Context, sporkId flow.Identifier, groupA []p2p.LibP2PNode, groupB []p2p.LibP2PNode) { +func ensureCommunicationSilenceAmongGroups( + t *testing.T, + ctx context.Context, + sporkId flow.Identifier, + groupANodes []p2p.LibP2PNode, + groupAIdentifiers flow.IdentifierList, + groupBNodes []p2p.LibP2PNode, + groupBIdentifiers flow.IdentifierList) { // ensures no connection, unicast, or pubsub going to the disallow-listed nodes - p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, groupA, groupB) + p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, groupANodes, groupBNodes) blockTopic := channels.TopicFromChannel(channels.PushBlocks, sporkId) - p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, groupA, groupB, blockTopic, 1, func() interface{} { - return unittest.ProposalFixture() - }) - p2pfixtures.EnsureNoStreamCreationBetweenGroups(t, ctx, groupA, groupB) + p2ptest.EnsureNoPubsubExchangeBetweenGroups( + t, + ctx, + groupANodes, + groupAIdentifiers, + groupBNodes, + groupBIdentifiers, + blockTopic, + 1, + func() interface{} { + return unittest.ProposalFixture() + }) + p2pfixtures.EnsureNoStreamCreationBetweenGroups(t, ctx, groupANodes, groupBNodes) } // ensureCommunicationOverAllProtocols ensures that all nodes are connected to each other, and they can exchange messages over the pubsub and unicast. diff --git a/network/p2p/dht/dht_test.go b/network/p2p/dht/dht_test.go index 8f1f8cd942e..4845d025407 100644 --- a/network/p2p/dht/dht_test.go +++ b/network/p2p/dht/dht_test.go @@ -102,7 +102,6 @@ func TestPubSubWithDHTDiscovery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - topic := channels.Topic("/flow/" + unittest.IdentifierFixture().String()) count := 5 golog.SetAllLoggers(golog.LevelFatal) // change this to Debug if libp2p logs are needed @@ -122,6 +121,7 @@ func TestPubSubWithDHTDiscovery(t *testing.T) { // N4 N5 N4-----N5 sporkId := unittest.IdentifierFixture() + topic := channels.TopicFromChannel(channels.TestNetworkChannel, sporkId) idProvider := mockmodule.NewIdentityProvider(t) // create one node running the DHT Server (mimicking the staked AN) dhtServerNodes, serverIDs := p2ptest.NodesFixture(t, sporkId, "dht_test", 1, idProvider, p2ptest.WithDHTOptions(dht.AsServer())) @@ -155,18 +155,15 @@ func TestPubSubWithDHTDiscovery(t *testing.T) { // hence expect count and not count - 1 messages to be received (one by each node, including the sender) ch := make(chan peer.ID, count) - codec := unittest.NetworkCodec() - - payload, _ := codec.Encode(&libp2pmsg.TestMessage{}) - msg := &message.Message{ - Payload: payload, - } - - data, err := msg.Marshal() + messageScope, err := message.NewOutgoingScope( + ids.NodeIDs(), + topic, + &libp2pmsg.TestMessage{}, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) require.NoError(t, err) logger := unittest.Logger() - topicValidator := flowpubsub.TopicValidator(logger, unittest.AllowAllPeerFilter()) for _, n := range nodes { s, err := n.Subscribe(topic, topicValidator) @@ -176,7 +173,7 @@ func TestPubSubWithDHTDiscovery(t *testing.T) { msg, err := s.Next(ctx) require.NoError(t, err) require.NotNil(t, msg) - assert.Equal(t, data, msg.Data) + assert.Equal(t, messageScope.Proto().Payload, msg.Data) ch <- nodeID }(s, n.Host().ID()) } @@ -196,7 +193,7 @@ func TestPubSubWithDHTDiscovery(t *testing.T) { require.Eventually(t, fullyConnectedGraph, time.Second*5, ticksForAssertEventually, "nodes failed to discover each other") // Step 4: publish a message to the topic - require.NoError(t, dhtServerNode.Publish(ctx, topic, data)) + require.NoError(t, dhtServerNode.Publish(ctx, messageScope)) // Step 5: By now, all peers would have been discovered and the message should have been successfully published // A hash set to keep track of the nodes who received the message @@ -221,6 +218,6 @@ loop: // Step 6: unsubscribes all nodes from the topic for _, n := range nodes { - assert.NoError(t, n.UnSubscribe(topic)) + assert.NoError(t, n.Unsubscribe(topic)) } } diff --git a/network/p2p/libp2pNode.go b/network/p2p/libp2pNode.go index 061e45a43ff..8426bd18ba6 100644 --- a/network/p2p/libp2pNode.go +++ b/network/p2p/libp2pNode.go @@ -64,10 +64,10 @@ type LibP2PNode interface { ListPeers(topic string) []peer.ID // Subscribe subscribes the node to the given topic and returns the subscription Subscribe(topic channels.Topic, topicValidator TopicValidatorFunc) (Subscription, error) - // UnSubscribe cancels the subscriber and closes the topic. - UnSubscribe(topic channels.Topic) error + // Unsubscribe cancels the subscriber and closes the topic corresponding to the given channel. + Unsubscribe(topic channels.Topic) error // Publish publishes the given payload on the topic. - Publish(ctx context.Context, topic channels.Topic, data []byte) error + Publish(ctx context.Context, messageScope network.OutgoingMessageScope) error // Host returns pointer to host object of node. Host() host.Host // WithDefaultUnicastProtocol overrides the default handler of the unicast manager and registers all preferred protocols. diff --git a/network/p2p/middleware/middleware.go b/network/p2p/middleware/middleware.go index ba8239363df..37cbce33c6e 100644 --- a/network/p2p/middleware/middleware.go +++ b/network/p2p/middleware/middleware.go @@ -30,7 +30,6 @@ import ( "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/blob" - "github.com/onflow/flow-go/network/p2p/p2pnode" "github.com/onflow/flow-go/network/p2p/ping" "github.com/onflow/flow-go/network/p2p/unicast/protocols" "github.com/onflow/flow-go/network/p2p/unicast/ratelimit" @@ -89,7 +88,7 @@ type Middleware struct { libP2PNode p2p.LibP2PNode preferredUnicasts []protocols.ProtocolName bitswapMetrics module.BitswapMetrics - rootBlockID flow.Identifier + sporkId flow.Identifier validators []network.MessageValidator peerManagerFilters []p2p.PeerFilter unicastMessageTimeout time.Duration @@ -136,7 +135,7 @@ type Config struct { Libp2pNode p2p.LibP2PNode FlowId flow.Identifier // This node's Flow ID BitSwapMetrics module.BitswapMetrics - RootBlockID flow.Identifier + SporkId flow.Identifier UnicastMessageTimeout time.Duration IdTranslator p2p.IDTranslator Codec network.Codec @@ -166,7 +165,7 @@ func NewMiddleware(cfg *Config, opts ...OptionFn) *Middleware { log: cfg.Logger, libP2PNode: cfg.Libp2pNode, bitswapMetrics: cfg.BitSwapMetrics, - rootBlockID: cfg.RootBlockID, + sporkId: cfg.SporkId, validators: DefaultValidators(cfg.Logger, cfg.FlowId), unicastMessageTimeout: cfg.UnicastMessageTimeout, idTranslator: cfg.IdTranslator, @@ -359,7 +358,7 @@ func (m *Middleware) OnAllowListNotification(notification *network.AllowListingU // - failed to send message to peer. // // All errors returned from this function can be considered benign. -func (m *Middleware) SendDirect(msg *network.OutgoingMessageScope) error { +func (m *Middleware) SendDirect(msg network.OutgoingMessageScope) error { // since it is a unicast, we only need to get the first peer ID. peerID, err := m.idTranslator.GetPeerID(msg.TargetIds()[0]) if err != nil { @@ -382,7 +381,11 @@ func (m *Middleware) SendDirect(msg *network.OutgoingMessageScope) error { // protect the underlying connection from being inadvertently pruned by the peer manager while the stream and // connection creation is being attempted, and remove it from protected list once stream created. - tag := fmt.Sprintf("%v:%v", msg.Channel(), msg.PayloadType()) + channel, ok := channels.ChannelFromTopic(msg.Topic()) + if !ok { + return fmt.Errorf("could not find channel for topic %s", msg.Topic()) + } + tag := fmt.Sprintf("%v:%v", channel, msg.PayloadType()) m.libP2PNode.Host().ConnManager().Protect(peerID, tag) defer m.libP2PNode.Host().ConnManager().Unprotect(peerID, tag) @@ -509,7 +512,7 @@ func (m *Middleware) handleIncomingStream(s libp2pnetwork.Stream) { } channel := channels.Channel(msg.ChannelID) - topic := channels.TopicFromChannel(channel, m.rootBlockID) + topic := channels.TopicFromChannel(channel, m.sporkId) // ignore messages if node does not have subscription to topic if !m.libP2PNode.HasSubscription(topic) { @@ -556,7 +559,7 @@ func (m *Middleware) handleIncomingStream(s libp2pnetwork.Stream) { remotePeer, role, msg.Size(), - network.MessageType(msg.Payload), + message.MessageType(msg.Payload), channels.Topic(msg.ChannelID)) { return } @@ -574,8 +577,7 @@ func (m *Middleware) handleIncomingStream(s libp2pnetwork.Stream) { // Subscribe subscribes the middleware to a channel. // No errors are expected during normal operation. func (m *Middleware) Subscribe(channel channels.Channel) error { - - topic := channels.TopicFromChannel(channel, m.rootBlockID) + topic := channels.TopicFromChannel(channel, m.sporkId) var peerFilter p2p.PeerFilter var validators []validator.PubSubMessageValidator @@ -625,16 +627,8 @@ func (m *Middleware) processPubSubMessages(msg *message.Message, peerID peer.ID) // // All errors returned from this function can be considered benign. func (m *Middleware) Unsubscribe(channel channels.Channel) error { - topic := channels.TopicFromChannel(channel, m.rootBlockID) - err := m.libP2PNode.UnSubscribe(topic) - if err != nil { - return fmt.Errorf("failed to unsubscribe from channel (%s): %w", channel, err) - } - - // update peers to remove nodes subscribed to channel - m.libP2PNode.RequestPeerUpdate() - - return nil + topic := channels.TopicFromChannel(channel, m.sporkId) + return m.libP2PNode.Unsubscribe(topic) } // processUnicastStreamMessage will decode, perform authorized sender validation and process a message @@ -724,7 +718,7 @@ func (m *Middleware) processAuthenticatedMessage(msg *message.Message, peerID pe return } - scope, err := network.NewIncomingScope(originId, protocol, msg, decodedMsgPayload) + scope, err := message.NewIncomingScope(originId, protocol, msg, decodedMsgPayload) if err != nil { m.log.Error(). Err(err). @@ -738,7 +732,7 @@ func (m *Middleware) processAuthenticatedMessage(msg *message.Message, peerID pe } // processMessage processes a message and eventually passes it to the overlay -func (m *Middleware) processMessage(scope *network.IncomingMessageScope) { +func (m *Middleware) processMessage(scope network.IncomingMessageScope) { logger := m.log.With(). Str("channel", scope.Channel().String()). Str("type", scope.Protocol().String()). @@ -749,7 +743,7 @@ func (m *Middleware) processMessage(scope *network.IncomingMessageScope) { // run through all the message validators for _, v := range m.validators { // if any one fails, stop message propagation - if !v.Validate(*scope) { + if !v.Validate(scope) { logger.Debug().Msg("new message filtered by message validators") return } @@ -773,46 +767,9 @@ func (m *Middleware) processMessage(scope *network.IncomingMessageScope) { // - the libP2P node fails to publish the message. // // All errors returned from this function can be considered benign. -func (m *Middleware) Publish(msg *network.OutgoingMessageScope) error { - m.log.Debug(). - Str("channel", msg.Channel().String()). - Interface("msg", msg.Proto()). - Str("type", msg.PayloadType()). - Int("msg_size", msg.Size()). - Msg("publishing new message") - - // convert the message to bytes to be put on the wire. - data, err := msg.Proto().Marshal() - if err != nil { - return fmt.Errorf("failed to marshal the message: %w", err) - } - - msgSize := len(data) - if msgSize > p2pnode.DefaultMaxPubSubMsgSize { - // libp2p pubsub will silently drop the message if its size is greater than the configured pubsub max message size - // hence return an error as this message is undeliverable - return fmt.Errorf("message size %d exceeds configured max message size %d", msgSize, p2pnode.DefaultMaxPubSubMsgSize) - } - - topic := channels.TopicFromChannel(msg.Channel(), m.rootBlockID) - - // publish the bytes on the topic - err = m.libP2PNode.Publish(m.ctx, topic, data) - if err != nil { - return fmt.Errorf("failed to publish the message: %w", err) - } - - return nil -} - -// IsConnected returns true if this node is connected to the node with id nodeID. -// All errors returned from this function can be considered benign. -func (m *Middleware) IsConnected(nodeID flow.Identifier) (bool, error) { - peerID, err := m.idTranslator.GetPeerID(nodeID) - if err != nil { - return false, fmt.Errorf("could not find peer id for target id: %w", err) - } - return m.libP2PNode.IsConnected(peerID) +// TODO: DO NOT USE. Publish is ready to be removed from middleware. Use libp2pNode.Publish directly. +func (m *Middleware) Publish(msg network.OutgoingMessageScope) error { + return m.libP2PNode.Publish(m.ctx, msg) } // unicastMaxMsgSize returns the max permissible size for a unicast message diff --git a/network/p2p/middleware/middleware_test.go b/network/p2p/middleware/middleware_test.go index 9b9cc1dbc0e..b1708701418 100644 --- a/network/p2p/middleware/middleware_test.go +++ b/network/p2p/middleware/middleware_test.go @@ -8,7 +8,6 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/messages" - "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p/middleware" @@ -18,9 +17,9 @@ import ( // TestChunkDataPackMaxMessageSize tests that the max message size for a chunk data pack response is set to the large message size. func TestChunkDataPackMaxMessageSize(t *testing.T) { // creates an outgoing chunk data pack response message (imitating an EN is sending a chunk data pack response to VN). - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{unittest.IdentifierFixture()}, - channels.ProvideChunks, + channels.TopicFromChannel(channels.ProvideChunks, unittest.IdentifierFixture()), &messages.ChunkDataResponse{ ChunkDataPack: *unittest.ChunkDataPackFixture(unittest.IdentifierFixture()), Nonce: rand.Uint64(), diff --git a/network/p2p/mock/lib_p2_p_node.go b/network/p2p/mock/lib_p2_p_node.go index 5813983110e..2fcdc336ad7 100644 --- a/network/p2p/mock/lib_p2_p_node.go +++ b/network/p2p/mock/lib_p2_p_node.go @@ -284,13 +284,13 @@ func (_m *LibP2PNode) PeerScoreExposer() p2p.PeerScoreExposer { return r0 } -// Publish provides a mock function with given fields: ctx, topic, data -func (_m *LibP2PNode) Publish(ctx context.Context, topic channels.Topic, data []byte) error { - ret := _m.Called(ctx, topic, data) +// Publish provides a mock function with given fields: ctx, messageScope +func (_m *LibP2PNode) Publish(ctx context.Context, messageScope flow_gonetwork.OutgoingMessageScope) error { + ret := _m.Called(ctx, messageScope) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, channels.Topic, []byte) error); ok { - r0 = rf(ctx, topic, data) + if rf, ok := ret.Get(0).(func(context.Context, flow_gonetwork.OutgoingMessageScope) error); ok { + r0 = rf(ctx, messageScope) } else { r0 = ret.Error(0) } @@ -430,8 +430,8 @@ func (_m *LibP2PNode) Subscribe(topic channels.Topic, topicValidator p2p.TopicVa return r0, r1 } -// UnSubscribe provides a mock function with given fields: topic -func (_m *LibP2PNode) UnSubscribe(topic channels.Topic) error { +// Unsubscribe provides a mock function with given fields: topic +func (_m *LibP2PNode) Unsubscribe(topic channels.Topic) error { ret := _m.Called(topic) var r0 error diff --git a/network/p2p/network.go b/network/p2p/network.go index a288b92c7d7..a3225f59925 100644 --- a/network/p2p/network.go +++ b/network/p2p/network.go @@ -40,6 +40,7 @@ var NotEjectedFilter = filter.Not(filter.Ejected) type Network struct { sync.RWMutex *component.ComponentManager + sporkId flow.Identifier identityProvider module.IdentityProvider logger zerolog.Logger codec network.Codec @@ -99,6 +100,7 @@ type NetworkConfig struct { ReceiveCache *netcache.ReceiveCache ConduitFactory network.ConduitFactory AlspCfg *alspmgr.MisbehaviorReportManagerConfig + SporkId flow.Identifier } // NetworkConfigOption is a function that can be used to override network config parmeters. @@ -167,6 +169,7 @@ func NewNetwork(param *NetworkConfig, opts ...NetworkOption) (*Network, error) { registerEngineRequests: make(chan *registerEngineRequest), registerBlobServiceRequests: make(chan *registerBlobServiceRequest), misbehaviorReportManager: misbehaviorMngr, + sporkId: param.SporkId, } for _, opt := range opts { @@ -375,7 +378,7 @@ func (n *Network) Identity(pid peer.ID) (*flow.Identity, bool) { return n.identityProvider.ByPeerID(pid) } -func (n *Network) Receive(msg *network.IncomingMessageScope) error { +func (n *Network) Receive(msg network.IncomingMessageScope) error { n.metrics.InboundMessageReceived(msg.Size(), msg.Channel().String(), msg.Protocol().String(), msg.PayloadType()) err := n.processNetworkMessage(msg) @@ -385,7 +388,7 @@ func (n *Network) Receive(msg *network.IncomingMessageScope) error { return nil } -func (n *Network) processNetworkMessage(msg *network.IncomingMessageScope) error { +func (n *Network) processNetworkMessage(msg network.IncomingMessageScope) error { // checks the cache for deduplication and adds the message if not already present if !n.receiveCache.Add(msg.EventID()) { // drops duplicate message @@ -426,9 +429,9 @@ func (n *Network) UnicastOnChannel(channel channels.Channel, payload interface{} return nil } - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{targetID}, - channel, + channels.TopicFromChannel(channel, n.sporkId), payload, n.codec.Encode, message.ProtocolTypeUnicast) @@ -436,14 +439,14 @@ func (n *Network) UnicastOnChannel(channel channels.Channel, payload interface{} return fmt.Errorf("could not generate outgoing message scope for unicast: %w", err) } - n.metrics.UnicastMessageSendingStarted(msg.Channel().String()) - defer n.metrics.UnicastMessageSendingCompleted(msg.Channel().String()) + n.metrics.UnicastMessageSendingStarted(channel.String()) + defer n.metrics.UnicastMessageSendingCompleted(channel.String()) err = n.mw.SendDirect(msg) if err != nil { return fmt.Errorf("failed to send message to %x: %w", targetID, err) } - n.metrics.OutboundMessageSent(msg.Size(), msg.Channel().String(), message.ProtocolTypeUnicast.String(), msg.PayloadType()) + n.metrics.OutboundMessageSent(msg.Size(), channel.String(), message.ProtocolTypeUnicast.String(), msg.PayloadType()) return nil } @@ -506,7 +509,12 @@ func (n *Network) sendOnChannel(channel channels.Channel, msg interface{}, targe Msg("sending new message on channel") // generate network message (encoding) based on list of recipients - scope, err := network.NewOutgoingScope(targetIDs, channel, msg, n.codec.Encode, message.ProtocolTypePubSub) + scope, err := message.NewOutgoingScope( + targetIDs, + channels.TopicFromChannel(channel, n.sporkId), + msg, + n.codec.Encode, + message.ProtocolTypePubSub) if err != nil { return fmt.Errorf("failed to generate outgoing message scope %s: %w", channel, err) } @@ -518,7 +526,7 @@ func (n *Network) sendOnChannel(channel channels.Channel, msg interface{}, targe return fmt.Errorf("failed to send message on channel %s: %w", channel, err) } - n.metrics.OutboundMessageSent(scope.Size(), scope.Channel().String(), message.ProtocolTypePubSub.String(), scope.PayloadType()) + n.metrics.OutboundMessageSent(scope.Size(), channel.String(), message.ProtocolTypePubSub.String(), scope.PayloadType()) return nil } diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index ac3f30fd4d0..7ded51080bc 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -44,13 +44,6 @@ import ( "github.com/onflow/flow-go/network/p2p/utils" ) -type GossipSubFactoryFunc func(context.Context, zerolog.Logger, host.Host, p2p.PubSubAdapterConfig) (p2p.PubSubAdapter, error) -type CreateNodeFunc func(logger zerolog.Logger, - host host.Host, - pCache *p2pnode.ProtocolPeerCache, - peerManager *connection.PeerManager) p2p.LibP2PNode -type GossipSubAdapterConfigFunc func(*p2p.BasePubSubAdapterConfig) p2p.PubSubAdapterConfig - type LibP2PNodeBuilder struct { gossipSubBuilder p2p.GossipSubBuilder sporkId flow.Identifier diff --git a/network/p2p/p2pnode/libp2pNode.go b/network/p2p/p2pnode/libp2pNode.go index 38746521430..9fcbcc9a466 100644 --- a/network/p2p/p2pnode/libp2pNode.go +++ b/network/p2p/p2pnode/libp2pNode.go @@ -76,10 +76,9 @@ func NewNode( peerManager p2p.PeerManager, disallowLstCacheCfg *p2p.DisallowListCacheConfig, ) *Node { - lg := logger.With().Str("component", "libp2p-node").Logger() return &Node{ host: host, - logger: lg, + logger: logger.With().Str("component", "libp2p-node").Logger(), topics: make(map[channels.Topic]p2p.Topic), subs: make(map[channels.Topic]p2p.Subscription), pCache: pCache, @@ -104,7 +103,7 @@ func (n *Node) Stop() error { n.logger.Debug().Msg("unsubscribing from all topics") for t := range n.topics { - err := n.UnSubscribe(t) + err := n.unsubscribeTopic(t) // context cancelled errors are expected while unsubscribing from topics during shutdown if err != nil && !errors.Is(err, context.Canceled) { result = multierror.Append(result, err) @@ -278,11 +277,35 @@ func (n *Node) Subscribe(topic channels.Topic, topicValidator p2p.TopicValidator return s, err } -// UnSubscribe cancels the subscriber and closes the topic. +// Unsubscribe cancels the subscriber and closes the topic. +// Args: +// topic: topic to unsubscribe from. +// Returns: +// error: error if any, which means unsubscribe failed. +// All errors returned from this function can be considered benign. +func (n *Node) Unsubscribe(topic channels.Topic) error { + err := n.unsubscribeTopic(topic) + if err != nil { + return fmt.Errorf("failed to unsubscribe from topic: %w", err) + } + + n.RequestPeerUpdate() + + return nil +} + +// unsubscribeTopic cancels the subscriber and closes the topic. // All errors returned from this function can be considered benign. -func (n *Node) UnSubscribe(topic channels.Topic) error { +// Args: +// +// topic: topic to unsubscribe from +// +// Returns: +// error: error if any. +func (n *Node) unsubscribeTopic(topic channels.Topic) error { n.Lock() defer n.Unlock() + // Remove the Subscriber from the cache if s, found := n.subs[topic]; found { s.Cancel() @@ -312,20 +335,43 @@ func (n *Node) UnSubscribe(topic channels.Topic) error { n.logger.Debug(). Str("topic", topic.String()). Msg("unsubscribed from topic") - return err + + return nil } // Publish publishes the given payload on the topic. // All errors returned from this function can be considered benign. -func (n *Node) Publish(ctx context.Context, topic channels.Topic, data []byte) error { - ps, found := n.topics[topic] +func (n *Node) Publish(ctx context.Context, messageScope flownet.OutgoingMessageScope) error { + lg := n.logger.With(). + Str("topic", messageScope.Topic().String()). + Interface("proto_message", messageScope.Proto()). + Str("payload_type", messageScope.PayloadType()). + Int("message_size", messageScope.Size()).Logger() + lg.Debug().Msg("received message to publish") + + // convert the message to bytes to be put on the wire. + data, err := messageScope.Proto().Marshal() + if err != nil { + return fmt.Errorf("failed to marshal the message: %w", err) + } + + msgSize := len(data) + if msgSize > DefaultMaxPubSubMsgSize { + // libp2p pubsub will silently drop the message if its size is greater than the configured pubsub max message size + // hence return an error as this message is undeliverable + return fmt.Errorf("message size %d exceeds configured max message size %d", msgSize, DefaultMaxPubSubMsgSize) + } + + ps, found := n.topics[messageScope.Topic()] if !found { - return fmt.Errorf("could not find topic (%s)", topic) + return fmt.Errorf("could not find topic (%s)", messageScope.Topic()) } - err := ps.Publish(ctx, data) + err = ps.Publish(ctx, data) if err != nil { - return fmt.Errorf("could not publish to topic (%s): %w", topic, err) + return fmt.Errorf("could not publish to topic (%s): %w", messageScope.Topic(), err) } + + lg.Debug().Msg("published message to topic") return nil } diff --git a/network/p2p/scoring/app_score_test.go b/network/p2p/scoring/app_score_test.go index 012378a874b..771a442a213 100644 --- a/network/p2p/scoring/app_score_test.go +++ b/network/p2p/scoring/app_score_test.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/internal/p2pfixtures" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/scoring" p2ptest "github.com/onflow/flow-go/network/p2p/test" @@ -96,14 +97,22 @@ func TestFullGossipSubConnectivity(t *testing.T) { // checks end-to-end message delivery works // each node sends a distinct message to all and checks that all nodes receive it. for _, node := range nodes { - proposalMsg := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channels.PushBlocks) - require.NoError(t, node.Publish(ctx, blockTopic, proposalMsg)) + outgoingMessageScope, err := message.NewOutgoingScope( + ids.NodeIDs(), + channels.TopicFromChannel(channels.PushBlocks, sporkId), + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, node.Publish(ctx, outgoingMessageScope)) // checks that the message is received by all nodes. ctx1s, cancel1s := context.WithTimeout(ctx, 5*time.Second) - p2pfixtures.SubsMustReceiveMessage(t, ctx1s, proposalMsg, groupOneSubs) - p2pfixtures.SubsMustReceiveMessage(t, ctx1s, proposalMsg, accessNodeSubs) - p2pfixtures.SubsMustReceiveMessage(t, ctx1s, proposalMsg, groupTwoSubs) + expectedReceivedData, err := outgoingMessageScope.Proto().Marshal() + require.NoError(t, err) + p2pfixtures.SubsMustReceiveMessage(t, ctx1s, expectedReceivedData, groupOneSubs) + p2pfixtures.SubsMustReceiveMessage(t, ctx1s, expectedReceivedData, accessNodeSubs) + p2pfixtures.SubsMustReceiveMessage(t, ctx1s, expectedReceivedData, groupTwoSubs) cancel1s() } @@ -157,7 +166,7 @@ func TestFullGossipSubConnectivityAmongHonestNodesWithMaliciousMajority(t *testi ) allNodes := append([]p2p.LibP2PNode{con1Node, con2Node}, accessNodeGroup...) - allIds := append([]*flow.Identity{&con1Id, &con2Id}, accessNodeIds...) + allIds := append(flow.IdentityList{&con1Id, &con2Id}, accessNodeIds...) provider := id.NewFixedIdentityProvider(allIds) idProvider.On("ByPeerID", mocktestify.Anything).Return( diff --git a/network/p2p/scoring/scoring_test.go b/network/p2p/scoring/scoring_test.go index d71f6dd4556..cc2814b5399 100644 --- a/network/p2p/scoring/scoring_test.go +++ b/network/p2p/scoring/scoring_test.go @@ -144,7 +144,16 @@ func TestInvalidCtrlMsgScoringIntegration(t *testing.T) { } // checks no GossipSub message exchange should no longer happen between node1 and node2. - p2ptest.EnsureNoPubsubExchangeBetweenGroups(t, ctx, []p2p.LibP2PNode{node1}, []p2p.LibP2PNode{node2}, blockTopic, 1, func() interface{} { - return unittest.ProposalFixture() - }) + p2ptest.EnsureNoPubsubExchangeBetweenGroups( + t, + ctx, + []p2p.LibP2PNode{node1}, + flow.IdentifierList{id1.NodeID}, + []p2p.LibP2PNode{node2}, + flow.IdentifierList{id2.NodeID}, + blockTopic, + 1, + func() interface{} { + return unittest.ProposalFixture() + }) } diff --git a/network/p2p/scoring/subscription_validator_test.go b/network/p2p/scoring/subscription_validator_test.go index 7527b11c37b..338d26d67c5 100644 --- a/network/p2p/scoring/subscription_validator_test.go +++ b/network/p2p/scoring/subscription_validator_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" p2ptest "github.com/onflow/flow-go/network/p2p/test" flowpubsub "github.com/onflow/flow-go/network/validator/pubsub" @@ -237,14 +238,23 @@ func TestSubscriptionValidator_Integration(t *testing.T) { // let the subscriptions be established time.Sleep(2 * time.Second) - proposalMsg := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channels.PushBlocks) - // consensus node publishes a proposal - require.NoError(t, conNode.Publish(ctx, blockTopic, proposalMsg)) + outgoingMessageScope, err := message.NewOutgoingScope( + ids.NodeIDs(), + channels.TopicFromChannel(channels.PushBlocks, sporkId), + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, conNode.Publish(ctx, outgoingMessageScope)) // checks that the message is received by all nodes. ctx1s, cancel1s := context.WithTimeout(ctx, 1*time.Second) defer cancel1s() - p2pfixtures.SubsMustReceiveMessage(t, ctx1s, proposalMsg, []p2p.Subscription{conSub, ver1SubBlocks, ver2SubBlocks}) + + expectedReceivedData, err := outgoingMessageScope.Proto().Marshal() + require.NoError(t, err) + + p2pfixtures.SubsMustReceiveMessage(t, ctx1s, expectedReceivedData, []p2p.Subscription{conSub, ver1SubBlocks, ver2SubBlocks}) // now consensus node is doing something very bad! // it is subscribing to a channel that it is not supposed to subscribe to. @@ -257,9 +267,14 @@ func TestSubscriptionValidator_Integration(t *testing.T) { // consensus node publishes another proposal, but this time, it should not reach verification node. // since upon an unauthorized subscription, verification node should have slashed consensus node on // the GossipSub scoring protocol. - proposalMsg = p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channels.PushBlocks) - // publishes a message to the topic. - require.NoError(t, conNode.Publish(ctx, blockTopic, proposalMsg)) + outgoingMessageScope, err = message.NewOutgoingScope( + ids.NodeIDs(), + channels.TopicFromChannel(channels.PushBlocks, sporkId), + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, conNode.Publish(ctx, outgoingMessageScope)) ctx5s, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() @@ -267,15 +282,25 @@ func TestSubscriptionValidator_Integration(t *testing.T) { // moreover, a verification node publishing a message to the request chunk topic should not reach consensus node. // however, both verification nodes should receive the message. - chunkDataPackRequestMsg := p2pfixtures.MustEncodeEvent(t, &messages.ChunkDataRequest{ - ChunkID: unittest.IdentifierFixture(), - Nonce: rand.Uint64(), - }, channels.RequestChunks) - require.NoError(t, verNode1.Publish(ctx, channels.TopicFromChannel(channels.RequestChunks, sporkId), chunkDataPackRequestMsg)) + outgoingMessageScope, err = message.NewOutgoingScope( + ids.NodeIDs(), + channels.TopicFromChannel(channels.RequestChunks, sporkId), + &messages.ChunkDataRequest{ + ChunkID: unittest.IdentifierFixture(), + Nonce: rand.Uint64(), + }, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, verNode1.Publish(ctx, outgoingMessageScope)) ctx1s, cancel1s = context.WithTimeout(ctx, 1*time.Second) defer cancel1s() - p2pfixtures.SubsMustReceiveMessage(t, ctx1s, chunkDataPackRequestMsg, []p2p.Subscription{ver1SubChunks, ver2SubChunks}) + + expectedReceivedData, err = outgoingMessageScope.Proto().Marshal() + require.NoError(t, err) + + p2pfixtures.SubsMustReceiveMessage(t, ctx1s, expectedReceivedData, []p2p.Subscription{ver1SubChunks, ver2SubChunks}) ctx5s, cancel5s = context.WithTimeout(ctx, 5*time.Second) defer cancel5s() diff --git a/network/p2p/subscription/subscription_filter_test.go b/network/p2p/subscription/subscription_filter_test.go index 58dd211347d..849ff60ebb5 100644 --- a/network/p2p/subscription/subscription_filter_test.go +++ b/network/p2p/subscription/subscription_filter_test.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/internal/p2pfixtures" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/subscription" p2ptest "github.com/onflow/flow-go/network/p2p/test" @@ -78,16 +79,26 @@ func TestFilterSubscribe(t *testing.T) { wg.Add(2) testPublish := func(wg *sync.WaitGroup, from p2p.LibP2PNode, sub p2p.Subscription) { - data := []byte("hello") - err := from.Publish(context.TODO(), badTopic, data) + outgoingMessageScope, err := message.NewOutgoingScope( + ids.NodeIDs(), + channels.TopicFromChannel(channels.SyncCommittee, sporkId), + []byte("hello"), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + + err = from.Publish(context.TODO(), outgoingMessageScope) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second) msg, err := sub.Next(ctx) cancel() require.NoError(t, err) - require.Equal(t, msg.Data, data) + + expectedReceivedData, err := outgoingMessageScope.Proto().Marshal() + require.NoError(t, err) + require.Equal(t, msg.Data, expectedReceivedData) ctx, cancel = context.WithTimeout(context.Background(), time.Second) _, err = unstakedSub.Next(ctx) diff --git a/network/p2p/test/fixtures.go b/network/p2p/test/fixtures.go index fb74b426e80..bf424dd6524 100644 --- a/network/p2p/test/fixtures.go +++ b/network/p2p/test/fixtures.go @@ -30,6 +30,7 @@ import ( flownet "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/internal/p2pfixtures" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/connection" p2pdht "github.com/onflow/flow-go/network/p2p/dht" @@ -615,19 +616,24 @@ func EnsurePubsubMessageExchange(t *testing.T, ctx context.Context, nodes []p2p. // let subscriptions propagate time.Sleep(1 * time.Second) - channel, ok := channels.ChannelFromTopic(topic) - require.True(t, ok) - for _, node := range nodes { for i := 0; i < count; i++ { // creates a unique message to be published by the node - msg := messageFactory() - data := p2pfixtures.MustEncodeEvent(t, msg, channel) - require.NoError(t, node.Publish(ctx, topic, data)) + payload := messageFactory() + outgoingMessageScope, err := message.NewOutgoingScope( + flow.IdentifierList{unittest.IdentifierFixture()}, + topic, + payload, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, node.Publish(ctx, outgoingMessageScope)) // wait for the message to be received by all nodes ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - p2pfixtures.SubsMustReceiveMessage(t, ctx, data, subs) + expectedReceivedData, err := outgoingMessageScope.Proto().Marshal() + require.NoError(t, err) + p2pfixtures.SubsMustReceiveMessage(t, ctx, expectedReceivedData, subs) cancel() } } @@ -639,32 +645,46 @@ func EnsurePubsubMessageExchange(t *testing.T, ctx context.Context, nodes []p2p. // // - ctx: the context- the test will fail if the context expires. // - sender: the node that sends the message to the other node. -// - receiver: the node that receives the message from the other node. +// - receiverNode: the node that receives the message from the other node. +// - receiverIdentifier: the identifier of the receiver node. // - topic: the topic to exchange messages on. // - count: the number of messages to exchange from `sender` to `receiver`. // - messageFactory: a function that creates a unique message to be published by the node. -func EnsurePubsubMessageExchangeFromNode(t *testing.T, ctx context.Context, sender p2p.LibP2PNode, receiver p2p.LibP2PNode, topic channels.Topic, count int, messageFactory func() interface{}) { +func EnsurePubsubMessageExchangeFromNode( + t *testing.T, + ctx context.Context, + sender p2p.LibP2PNode, + receiverNode p2p.LibP2PNode, + receiverIdentifier flow.Identifier, + topic channels.Topic, + count int, + messageFactory func() interface{}) { _, err := sender.Subscribe(topic, validator.TopicValidator(unittest.Logger(), unittest.AllowAllPeerFilter())) require.NoError(t, err) - toSub, err := receiver.Subscribe(topic, validator.TopicValidator(unittest.Logger(), unittest.AllowAllPeerFilter())) + toSub, err := receiverNode.Subscribe(topic, validator.TopicValidator(unittest.Logger(), unittest.AllowAllPeerFilter())) require.NoError(t, err) // let subscriptions propagate time.Sleep(1 * time.Second) - channel, ok := channels.ChannelFromTopic(topic) - require.True(t, ok) - for i := 0; i < count; i++ { // creates a unique message to be published by the node - msg := messageFactory() - data := p2pfixtures.MustEncodeEvent(t, msg, channel) - require.NoError(t, sender.Publish(ctx, topic, data)) + payload := messageFactory() + outgoingMessageScope, err := message.NewOutgoingScope( + flow.IdentifierList{receiverIdentifier}, + topic, + payload, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, sender.Publish(ctx, outgoingMessageScope)) // wait for the message to be received by all nodes ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - p2pfixtures.SubsMustReceiveMessage(t, ctx, data, []p2p.Subscription{toSub}) + expectedReceivedData, err := outgoingMessageScope.Proto().Marshal() + require.NoError(t, err) + p2pfixtures.SubsMustReceiveMessage(t, ctx, expectedReceivedData, []p2p.Subscription{toSub}) cancel() } } @@ -698,7 +718,15 @@ func EnsureNotConnectedBetweenGroups(t *testing.T, ctx context.Context, groupA [ // - topic: the topic to exchange messages on. // - count: the number of messages to exchange from each node. // - messageFactory: a function that creates a unique message to be published by the node. -func EnsureNoPubsubMessageExchange(t *testing.T, ctx context.Context, from []p2p.LibP2PNode, to []p2p.LibP2PNode, topic channels.Topic, count int, messageFactory func() interface{}) { +func EnsureNoPubsubMessageExchange( + t *testing.T, + ctx context.Context, + from []p2p.LibP2PNode, + to []p2p.LibP2PNode, + toIdentifiers flow.IdentifierList, + topic channels.Topic, + count int, + messageFactory func() interface{}) { subs := make([]p2p.Subscription, len(to)) tv := validator.TopicValidator( unittest.Logger(), @@ -725,13 +753,17 @@ func EnsureNoPubsubMessageExchange(t *testing.T, ctx context.Context, from []p2p wg.Add(1) go func() { // creates a unique message to be published by the node. - msg := messageFactory() - channel, ok := channels.ChannelFromTopic(topic) - require.True(t, ok) - data := p2pfixtures.MustEncodeEvent(t, msg, channel) - // ensure the message is NOT received by any of the nodes. - require.NoError(t, node.Publish(ctx, topic, data)) + payload := messageFactory() + outgoingMessageScope, err := message.NewOutgoingScope( + toIdentifiers, + topic, + payload, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + require.NoError(t, node.Publish(ctx, outgoingMessageScope)) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) p2pfixtures.SubsMustNeverReceiveAnyMessage(t, ctx, subs) cancel() @@ -749,16 +781,27 @@ func EnsureNoPubsubMessageExchange(t *testing.T, ctx context.Context, from []p2p // Args: // - t: *testing.T instance // - ctx: context.Context instance -// - groupA: first group of nodes- no message should be exchanged from any node of this group to the other group. -// - groupB: second group of nodes- no message should be exchanged from any node of this group to the other group. +// - groupANodes: first group of nodes- no message should be exchanged from any node of this group to the other group. +// - groupAIdentifiers: identifiers of the nodes in the first group. +// - groupBNodes: second group of nodes- no message should be exchanged from any node of this group to the other group. +// - groupBIdentifiers: identifiers of the nodes in the second group. // - topic: pubsub topic- no message should be exchanged on this topic. // - count: number of messages to be exchanged- no message should be exchanged. // - messageFactory: function to create a unique message to be published by the node. -func EnsureNoPubsubExchangeBetweenGroups(t *testing.T, ctx context.Context, groupA []p2p.LibP2PNode, groupB []p2p.LibP2PNode, topic channels.Topic, count int, messageFactory func() interface{}) { +func EnsureNoPubsubExchangeBetweenGroups( + t *testing.T, + ctx context.Context, + groupANodes []p2p.LibP2PNode, + groupAIdentifiers flow.IdentifierList, + groupBNodes []p2p.LibP2PNode, + groupBIdentifiers flow.IdentifierList, + topic channels.Topic, + count int, + messageFactory func() interface{}) { // ensure no message exchange from group A to group B - EnsureNoPubsubMessageExchange(t, ctx, groupA, groupB, topic, count, messageFactory) + EnsureNoPubsubMessageExchange(t, ctx, groupANodes, groupBNodes, groupBIdentifiers, topic, count, messageFactory) // ensure no message exchange from group B to group A - EnsureNoPubsubMessageExchange(t, ctx, groupB, groupA, topic, count, messageFactory) + EnsureNoPubsubMessageExchange(t, ctx, groupBNodes, groupANodes, groupAIdentifiers, topic, count, messageFactory) } // PeerIdSliceFixture returns a slice of random peer IDs for testing. diff --git a/network/p2p/test/sporking_test.go b/network/p2p/test/sporking_test.go index e01a0531e71..c3b34eaa09d 100644 --- a/network/p2p/test/sporking_test.go +++ b/network/p2p/test/sporking_test.go @@ -12,7 +12,6 @@ import ( "github.com/onflow/flow-go/model/flow" libp2pmessage "github.com/onflow/flow-go/model/libp2p/message" - "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" @@ -87,10 +86,12 @@ func TestCrosstalkPreventionOnNetworkKeyChange(t *testing.T) { require.NoError(t, err) // create stream from node 1 to node 2 - testOneToOneMessagingSucceeds(t, node1, peerInfo2) + node1.Host().Peerstore().AddAddrs(peerInfo2.ID, peerInfo2.Addrs, peerstore.AddressTTL) + s, err := node1.CreateStream(context.Background(), peerInfo2.ID) + require.NoError(t, err) + assert.NotNil(t, s) // Simulate a hard-spoon: node1 is on the old chain, but node2 is moved from the old chain to the new chain - // stop node 2 and start it again with a different networking key but on the same IP and port p2ptest.StopNode(t, node2, cancel2) @@ -151,8 +152,11 @@ func TestOneToOneCrosstalkPrevention(t *testing.T) { idProvider.SetIdentities(flow.IdentityList{&id1, &id2}) p2ptest.StartNode(t, signalerCtx2, node2) - // create stream from node 2 to node 1 - testOneToOneMessagingSucceeds(t, node2, peerInfo1) + // create stream from node 1 to node 2 + node2.Host().Peerstore().AddAddrs(peerInfo1.ID, peerInfo1.Addrs, peerstore.AddressTTL) + s, err := node2.CreateStream(context.Background(), peerInfo1.ID) + require.NoError(t, err) + assert.NotNil(t, s) // Simulate a hard-spoon: node1 is on the old chain, but node2 is moved from the old chain to the new chain // stop node 2 and start it again with a different libp2p protocol id to listen for @@ -237,54 +241,9 @@ func TestOneToKCrosstalkPrevention(t *testing.T) { time.Sleep(time.Second) // assert that node 1 can successfully send a message to node 2 via PubSub - testOneToKMessagingSucceeds(ctx, t, node1, sub2, topicBeforeSpork) - - // new root id after spork - rootIDAfterSpork := unittest.IdentifierFixture() - - // topic after the spork - topicAfterSpork := channels.TopicFromChannel(channels.TestNetworkChannel, rootIDAfterSpork) - - // mimic that node1 is now part of the new spork while node2 remains on the old spork - // by unsubscribing node1 from 'topicBeforeSpork' and subscribing it to 'topicAfterSpork' - // and keeping node2 subscribed to topic 'topicBeforeSpork' - err = node1.UnSubscribe(topicBeforeSpork) - require.NoError(t, err) - _, err = node1.Subscribe(topicAfterSpork, topicValidator) - require.NoError(t, err) - - // assert that node 1 can no longer send a message to node 2 via PubSub - testOneToKMessagingFails(ctx, t, node1, sub2, topicAfterSpork) -} - -func testOneToOneMessagingSucceeds(t *testing.T, sourceNode p2p.LibP2PNode, peerInfo peer.AddrInfo) { - // create stream from node 1 to node 2 - sourceNode.Host().Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL) - s, err := sourceNode.CreateStream(context.Background(), peerInfo.ID) - // assert that stream creation succeeded - require.NoError(t, err) - assert.NotNil(t, s) -} - -func testOneToOneMessagingFails(t *testing.T, sourceNode p2p.LibP2PNode, peerInfo peer.AddrInfo) { - // create stream from source node to destination address - sourceNode.Host().Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL) - _, err := sourceNode.CreateStream(context.Background(), peerInfo.ID) - // assert that stream creation failed - assert.Error(t, err) - // assert that it failed with the expected error - assert.Regexp(t, ".*failed to negotiate security protocol.*|.*protocols not supported.*", err) -} - -func testOneToKMessagingSucceeds(ctx context.Context, - t *testing.T, - sourceNode p2p.LibP2PNode, - dstnSub p2p.Subscription, - topic channels.Topic) { - - sentMsg, err := network.NewOutgoingScope( + outgoingMessageScope, err := message.NewOutgoingScope( flow.IdentifierList{unittest.IdentifierFixture()}, - channels.TestNetworkChannel, + topicBeforeSpork, &libp2pmessage.TestMessage{ Text: string("hello"), }, @@ -292,32 +251,40 @@ func testOneToKMessagingSucceeds(ctx context.Context, message.ProtocolTypePubSub) require.NoError(t, err) - sentData, err := sentMsg.Proto().Marshal() + expectedReceivedData, err := outgoingMessageScope.Proto().Marshal() require.NoError(t, err) // send a 1-k message from source node to destination node - err = sourceNode.Publish(ctx, topic, sentData) + err = node1.Publish(ctx, outgoingMessageScope) require.NoError(t, err) // assert that the message is received by the destination node unittest.AssertReturnsBefore(t, func() { - msg, err := dstnSub.Next(ctx) + msg, err := sub2.Next(ctx) require.NoError(t, err) - assert.Equal(t, sentData, msg.Data) + assert.Equal(t, expectedReceivedData, msg.Data) }, // libp2p hearbeats every second, so at most the message should take 1 second 2*time.Second) -} -func testOneToKMessagingFails(ctx context.Context, - t *testing.T, - sourceNode p2p.LibP2PNode, - dstnSub p2p.Subscription, - topic channels.Topic) { + // new root id after spork + rootIDAfterSpork := unittest.IdentifierFixture() - sentMsg, err := network.NewOutgoingScope( - flow.IdentifierList{unittest.IdentifierFixture()}, - channels.TestNetworkChannel, + // topic after the spork + topicAfterSpork := channels.TopicFromChannel(channels.TestNetworkChannel, rootIDAfterSpork) + + // mimic that node1 is now part of the new spork while node2 remains on the old spork + // by unsubscribing node1 from 'topicBeforeSpork' and subscribing it to 'topicAfterSpork' + // and keeping node2 subscribed to topic 'topicBeforeSpork' + err = node1.Unsubscribe(topicBeforeSpork) + require.NoError(t, err) + _, err = node1.Subscribe(topicAfterSpork, topicValidator) + require.NoError(t, err) + + // assert that node 1 can no longer send a message to node 2 via PubSub + outgoingMessageScope, err = message.NewOutgoingScope( + flow.IdentifierList{id2.NodeID}, + topicAfterSpork, &libp2pmessage.TestMessage{ Text: string("hello"), }, @@ -325,18 +292,25 @@ func testOneToKMessagingFails(ctx context.Context, message.ProtocolTypePubSub) require.NoError(t, err) - sentData, err := sentMsg.Proto().Marshal() - require.NoError(t, err) - // send a 1-k message from source node to destination node - err = sourceNode.Publish(ctx, topic, sentData) + err = node1.Publish(ctx, outgoingMessageScope) require.NoError(t, err) // assert that the message is never received by the destination node _ = unittest.RequireNeverReturnBefore(t, func() { - _, _ = dstnSub.Next(ctx) + _, _ = sub2.Next(ctx) }, // libp2p hearbeats every second, so at most the message should take 1 second 2*time.Second, "nodes on different sporks were able to communicate") } + +func testOneToOneMessagingFails(t *testing.T, sourceNode p2p.LibP2PNode, peerInfo peer.AddrInfo) { + // create stream from source node to destination address + sourceNode.Host().Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL) + _, err := sourceNode.CreateStream(context.Background(), peerInfo.ID) + // assert that stream creation failed + assert.Error(t, err) + // assert that it failed with the expected error + assert.Regexp(t, ".*failed to negotiate security protocol.*|.*protocols not supported.*", err) +} diff --git a/network/p2p/test/topic_validator_test.go b/network/p2p/test/topic_validator_test.go index 3a1c2ac7277..f5ea66d44a2 100644 --- a/network/p2p/test/topic_validator_test.go +++ b/network/p2p/test/topic_validator_test.go @@ -93,10 +93,16 @@ func TestTopicValidator_Unstaked(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() - // create a dummy block proposal to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channel) - err = sn2.Publish(timedCtx, topic, data1) + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + + err = sn2.Publish(timedCtx, outgoingMessageScope1) require.NoError(t, err) // sn1 should not receive message from sn2 because sn2 is unstaked @@ -146,10 +152,16 @@ func TestTopicValidator_PublicChannel(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() - // create a dummy sync request to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, &messages.SyncRequest{Nonce: 0, Height: 0}, channel) - err = sn2.Publish(timedCtx, topic, data1) + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + &messages.SyncRequest{Nonce: 0, Height: 0}, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + + err = sn2.Publish(timedCtx, outgoingMessageScope1) require.NoError(t, err) var wg sync.WaitGroup @@ -158,11 +170,14 @@ func TestTopicValidator_PublicChannel(t *testing.T) { timedCtx, cancel1s := context.WithTimeout(ctx, time.Second) defer cancel1s() + expectedReceivedData, err := outgoingMessageScope1.Proto().Marshal() + require.NoError(t, err) + // sn1 gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub1) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData, sub1) // sn2 also gets the message (as part of the libp2p loopback of published topic messages) - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub2) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData, sub2) unittest.RequireReturnsBefore(t, wg.Wait, 5*time.Second, "could not receive message on time") } @@ -208,11 +223,20 @@ func TestTopicValidator_TopicMismatch(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() + // create a dummy block proposal to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channels.Channel("invalid-channel")) + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) - err = sn2.Publish(timedCtx, topic, data1) + // intentionally overriding the channel id to be different from the topic + outgoingMessageScope1.Proto().ChannelID = channels.PublicSyncCommittee.String() + err = sn2.Publish(timedCtx, outgoingMessageScope1) // publish fails because the channel validation fails require.Error(t, err) @@ -260,14 +284,30 @@ func TestTopicValidator_InvalidTopic(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() - // create a dummy block proposal to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channels.PushBlocks) - err = sn2.Publish(timedCtx, topic, data1) + // invalid topic is malformed, hence it cannot be used to create a message scope, as it faces an error. + // Hence, we create a dummy block proposal message scope to publish on a legit topic, and then override + // the topic in the next step to a malformed topic. + dummyMessageScope, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + channels.TopicFromChannel(channels.PushBlocks, sporkId), + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + + // overrides the topic to be an invalid topic + corruptOutgoingMessageScope := mocknetwork.NewOutgoingMessageScope(t) + corruptOutgoingMessageScope.On("Topic").Return(topic) + corruptOutgoingMessageScope.On("Proto").Return(dummyMessageScope.Proto()) + corruptOutgoingMessageScope.On("PayloadType").Return(dummyMessageScope.PayloadType()) + corruptOutgoingMessageScope.On("Size").Return(dummyMessageScope.Size()) + + // create a dummy block proposal to publish from our SN node + err = sn2.Publish(timedCtx, corruptOutgoingMessageScope) // publish fails because the topic conversion fails require.Error(t, err) - // ensure the correct error is contained in the logged error require.Contains(t, hook.Logs(), "could not convert topic to channel") } @@ -345,34 +385,52 @@ func TestAuthorizedSenderValidator_Unauthorized(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 60*time.Second) defer cancel5s() - // create a dummy block proposal to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channel) // sn2 publishes the block proposal, sn1 and an1 should receive the message because // SN nodes are authorized to send block proposals - err = sn2.Publish(timedCtx, topic, data1) + // create a dummy block proposal to publish from our SN node + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + err = sn2.Publish(timedCtx, outgoingMessageScope1) + require.NoError(t, err) + + expectedReceivedData1, err := outgoingMessageScope1.Proto().Marshal() require.NoError(t, err) // sn1 gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub1) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub1) // sn2 also gets the message (as part of the libp2p loopback of published topic messages) - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub2) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub2) // an1 also gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub3) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub3) timedCtx, cancel2s := context.WithTimeout(ctx, 2*time.Second) defer cancel2s() - data2 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channel) // the access node now publishes the block proposal message, AN are not authorized to publish block proposals // the message should be rejected by the topic validator on sn1 - err = an1.Publish(timedCtx, topic, data2) + outgoingMessageScope2, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + err = an1.Publish(timedCtx, outgoingMessageScope2) + require.NoError(t, err) + + expectedReceivedData2, err := outgoingMessageScope2.Proto().Marshal() require.NoError(t, err) // an1 receives its own message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data2, sub3) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData2, sub3) var wg sync.WaitGroup @@ -449,11 +507,17 @@ func TestAuthorizedSenderValidator_InvalidMsg(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() - // create a dummy block proposal to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channel) + // create a dummy block proposal to publish from our SN node // sn2 publishes the block proposal on the sync committee channel - err = sn2.Publish(timedCtx, topic, data1) + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + err = sn2.Publish(timedCtx, outgoingMessageScope1) require.NoError(t, err) // sn1 should not receive message from sn2 @@ -532,29 +596,46 @@ func TestAuthorizedSenderValidator_Ejected(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() - // create a dummy block proposal to publish from our SN node - data1 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channel) // sn2 publishes the block proposal, sn1 and an1 should receive the message because // SN nodes are authorized to send block proposals - err = sn2.Publish(timedCtx, topic, data1) + // create a dummy block proposal to publish from our SN node + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + err = sn2.Publish(timedCtx, outgoingMessageScope1) + require.NoError(t, err) + + expectedReceivedData1, err := outgoingMessageScope1.Proto().Marshal() require.NoError(t, err) // sn1 gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub1) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub1) // sn2 also gets the message (as part of the libp2p loopback of published topic messages) - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub2) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub2) // an1 also gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data1, sub3) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub3) // "eject" sn2 to ensure messages published by ejected nodes get rejected identity2.Ejected = true - data3 := p2pfixtures.MustEncodeEvent(t, unittest.ProposalFixture(), channel) + + outgoingMessageScope3, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + unittest.ProposalFixture(), + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) + timedCtx, cancel2s := context.WithTimeout(ctx, time.Second) defer cancel2s() - err = sn2.Publish(timedCtx, topic, data3) + err = sn2.Publish(timedCtx, outgoingMessageScope3) require.NoError(t, err) // sn1 should not receive rejected message from ejected sn2 @@ -627,19 +708,29 @@ func TestAuthorizedSenderValidator_ClusterChannel(t *testing.T) { timedCtx, cancel5s := context.WithTimeout(ctx, 5*time.Second) defer cancel5s() + // create a dummy sync request to publish from our LN node - data := p2pfixtures.MustEncodeEvent(t, &messages.RangeRequest{}, channel) + outgoingMessageScope1, err := message.NewOutgoingScope( + flow.IdentifierList{identity1.NodeID, identity2.NodeID}, + topic, + &messages.RangeRequest{}, + unittest.NetworkCodec().Encode, + message.ProtocolTypePubSub) + require.NoError(t, err) // ln2 publishes the sync request on the cluster channel - err = ln2.Publish(timedCtx, topic, data) + err = ln2.Publish(timedCtx, outgoingMessageScope1) + require.NoError(t, err) + + expectedReceivedData1, err := outgoingMessageScope1.Proto().Marshal() require.NoError(t, err) // ln1 gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data, sub1) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub1) // ln2 also gets the message (as part of the libp2p loopback of published topic messages) - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data, sub2) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub2) // ln3 also gets the message - p2pfixtures.SubMustReceiveMessage(t, timedCtx, data, sub3) + p2pfixtures.SubMustReceiveMessage(t, timedCtx, expectedReceivedData1, sub3) } diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index b0cf703681a..b883100f4c3 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -39,8 +39,10 @@ func TestGossipSubMeshTracer(t *testing.T) { idProvider := mockmodule.NewIdentityProvider(t) defer cancel() - topic1 := channels.TopicFromChannel(channels.PushBlocks, sporkId) - topic2 := channels.TopicFromChannel(channels.PushReceipts, sporkId) + channel1 := channels.PushBlocks + topic1 := channels.TopicFromChannel(channel1, sporkId) + channel2 := channels.PushReceipts + topic2 := channels.TopicFromChannel(channel2, sporkId) loggerCycle := atomic.NewInt32(0) warnLoggerCycle := atomic.NewInt32(0) @@ -176,9 +178,9 @@ func TestGossipSubMeshTracer(t *testing.T) { // all nodes except the tracerNode unsubscribe from the topic1, which triggers sending a PRUNE to the tracerNode for each unsubscription. // We expect the tracerNode to remove the otherNode1, otherNode2, and unknownNode from its mesh. - require.NoError(t, otherNode1.UnSubscribe(topic1)) - require.NoError(t, otherNode2.UnSubscribe(topic1)) - require.NoError(t, unknownNode.UnSubscribe(topic1)) + require.NoError(t, otherNode1.Unsubscribe(topic1)) + require.NoError(t, otherNode2.Unsubscribe(topic1)) + require.NoError(t, unknownNode.Unsubscribe(topic1)) assert.Eventually(t, func() bool { // eventually, the tracerNode should not have the other node in its mesh for topic1. diff --git a/network/p2p/unicast/ratelimit/bandwidth_rate_limiter_test.go b/network/p2p/unicast/ratelimit/bandwidth_rate_limiter_test.go index 0016ae49f63..1bcb28a46cc 100644 --- a/network/p2p/unicast/ratelimit/bandwidth_rate_limiter_test.go +++ b/network/p2p/unicast/ratelimit/bandwidth_rate_limiter_test.go @@ -6,7 +6,6 @@ import ( "github.com/onflow/flow-go/model/flow" libp2pmessage "github.com/onflow/flow-go/model/libp2p/message" - "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/message" "github.com/stretchr/testify/require" @@ -37,9 +36,10 @@ func TestBandWidthRateLimiter_Allow(t *testing.T) { b[i] = byte('X') } - msg, err := network.NewOutgoingScope( + sporkId := unittest.IdentifierFixture() + msg, err := message.NewOutgoingScope( flow.IdentifierList{unittest.IdentifierFixture()}, - channels.TestNetworkChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, sporkId), &libp2pmessage.TestMessage{ Text: string(b), }, @@ -90,9 +90,10 @@ func TestBandWidthRateLimiter_IsRateLimited(t *testing.T) { require.False(t, bandwidthRateLimiter.IsRateLimited(peerID)) - msg, err := network.NewOutgoingScope( + sporkId := unittest.IdentifierFixture() + msg, err := message.NewOutgoingScope( flow.IdentifierList{unittest.IdentifierFixture()}, - channels.TestNetworkChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, sporkId), &libp2pmessage.TestMessage{ Text: string(b), }, diff --git a/network/test/blob_service_test.go b/network/test/blob_service_test.go index 7e3c40f96d8..f42d74ab140 100644 --- a/network/test/blob_service_test.go +++ b/network/test/blob_service_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/atomic" @@ -98,7 +99,7 @@ func (suite *BlobServiceTestSuite) SetupTest() { nodes, testutils.MiddlewareConfigFixture(suite.T(), sporkId), mocknetwork.NewViolationsConsumer(suite.T())) - suite.networks = testutils.NetworksFixture(suite.T(), ids, mws) + suite.networks = testutils.NetworksFixture(suite.T(), sporkId, ids, mws) testutils.StartNodesAndNetworks(signalerCtx, suite.T(), nodes, suite.networks, 100*time.Millisecond) blobExchangeChannel := channels.Channel("blob-exchange") @@ -118,10 +119,10 @@ func (suite *BlobServiceTestSuite) SetupTest() { // let nodes connect to each other only after they are all listening on Bitswap topologyActive.Store(true) suite.Require().Eventually(func() bool { - for i, mw := range mws { + for i, libp2pNode := range nodes { for j := i + 1; j < suite.numNodes; j++ { - connected, err := mw.IsConnected(ids[j].NodeID) - suite.Require().NoError(err) + connected, err := libp2pNode.IsConnected(nodes[j].Host().ID()) + require.NoError(suite.T(), err) if !connected { return false } diff --git a/network/test/echoengine_test.go b/network/test/echoengine_test.go index 3c544279b6f..e329a83e780 100644 --- a/network/test/echoengine_test.go +++ b/network/test/echoengine_test.go @@ -62,7 +62,7 @@ func (suite *EchoEngineTestSuite) SetupTest() { nodes, testutils.MiddlewareConfigFixture(suite.T(), sporkId), mocknetwork.NewViolationsConsumer(suite.T())) - suite.nets = testutils.NetworksFixture(suite.T(), suite.ids, suite.mws) + suite.nets = testutils.NetworksFixture(suite.T(), sporkId, suite.ids, suite.mws) testutils.StartNodesAndNetworks(signalerCtx, suite.T(), nodes, suite.nets, 100*time.Millisecond) } diff --git a/network/test/epochtransition_test.go b/network/test/epochtransition_test.go index 26169f91843..0632f2a9a44 100644 --- a/network/test/epochtransition_test.go +++ b/network/test/epochtransition_test.go @@ -25,6 +25,7 @@ import ( "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/internal/testutils" "github.com/onflow/flow-go/network/mocknetwork" + "github.com/onflow/flow-go/network/p2p" mockprotocol "github.com/onflow/flow-go/state/protocol/mock" "github.com/onflow/flow-go/utils/unittest" ) @@ -48,10 +49,11 @@ type MutableIdentityTableSuite struct { // testNode encapsulates the node state which includes its identity, middleware, network, // mesh engine and the id refresher type testNode struct { - id *flow.Identity - mw network.Middleware - net network.Network - engine *testutils.MeshEngine + id *flow.Identity + libp2pNode p2p.LibP2PNode + mw network.Middleware + net network.Network + engine *testutils.MeshEngine } // testNodeList encapsulates a list of test node and @@ -120,6 +122,16 @@ func (t *testNodeList) networks() []network.Network { return nets } +func (t *testNodeList) libp2pNodes() []p2p.LibP2PNode { + t.RLock() + defer t.RUnlock() + nodes := make([]p2p.LibP2PNode, len(t.nodes)) + for i, node := range t.nodes { + nodes[i] = node.libp2pNode + } + return nodes +} + func TestMutableIdentityTable(t *testing.T) { unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") suite.Run(t, new(MutableIdentityTableSuite)) @@ -189,7 +201,7 @@ func (suite *MutableIdentityTableSuite) addNodes(count int) { nodes, testutils.MiddlewareConfigFixture(suite.T(), sporkId), mocknetwork.NewViolationsConsumer(suite.T())) - nets := testutils.NetworksFixture(suite.T(), ids, mws) + nets := testutils.NetworksFixture(suite.T(), sporkId, ids, mws) suite.cancels = append(suite.cancels, cancel) testutils.StartNodesAndNetworks(signalerCtx, suite.T(), nodes, nets, 100*time.Millisecond) @@ -200,10 +212,11 @@ func (suite *MutableIdentityTableSuite) addNodes(count int) { // create the test engines for i := 0; i < count; i++ { node := testNode{ - id: ids[i], - mw: mws[i], - net: nets[i], - engine: engines[i], + id: ids[i], + libp2pNode: nodes[i], + mw: mws[i], + net: nets[i], + engine: engines[i], } suite.testNodes.append(node) } @@ -226,7 +239,6 @@ func (suite *MutableIdentityTableSuite) TestNewNodeAdded() { newNode, err := suite.testNodes.lastAdded() require.NoError(suite.T(), err) newID := newNode.id - newMiddleware := newNode.mw suite.logger.Debug(). Str("new_node", newID.NodeID.String()). @@ -240,7 +252,7 @@ func (suite *MutableIdentityTableSuite) TestNewNodeAdded() { // check if the new node has sufficient connections with the existing nodes // if it does, then it has been inducted successfully in the network - suite.assertConnected(newMiddleware, ids.Filter(filter.Not(filter.HasNodeID(newID.NodeID)))) + suite.assertConnected(newNode.libp2pNode, suite.testNodes.libp2pNodes()) // check that all the engines on this new epoch can talk to each other using any of the three networking primitives suite.assertNetworkPrimitives(ids, engs, nil, nil) @@ -250,11 +262,9 @@ func (suite *MutableIdentityTableSuite) TestNewNodeAdded() { // list (ie. as a result of an ejection or transition into an epoch where that node // has un-staked) then it cannot connect to the network. func (suite *MutableIdentityTableSuite) TestNodeRemoved() { - // removed a node removedNode := suite.removeNode() removedID := removedNode.id - removedMiddleware := removedNode.mw removedEngine := removedNode.engine // update IDs for all the remaining nodes @@ -265,7 +275,7 @@ func (suite *MutableIdentityTableSuite) TestNodeRemoved() { remainingEngs := suite.testNodes.engines() // assert that the removed node has no connections with any of the other nodes - suite.assertDisconnected(removedMiddleware, remainingIDs) + suite.assertDisconnected(removedNode.libp2pNode, suite.testNodes.libp2pNodes()) // check that all remaining engines can still talk to each other while the ones removed can't // using any of the three networking primitives @@ -284,15 +294,12 @@ func (suite *MutableIdentityTableSuite) TestNodesAddedAndRemoved() { // remove a node removedNode := suite.removeNode() removedID := removedNode.id - removedMiddleware := removedNode.mw removedEngine := removedNode.engine // add a node suite.addNodes(1) newNode, err := suite.testNodes.lastAdded() require.NoError(suite.T(), err) - newID := newNode.id - newMiddleware := newNode.mw // update all current nodes suite.signalIdentityChanged() @@ -301,10 +308,10 @@ func (suite *MutableIdentityTableSuite) TestNodesAddedAndRemoved() { remainingEngs := suite.testNodes.engines() // check if the new node has sufficient connections with the existing nodes - suite.assertConnected(newMiddleware, remainingIDs.Filter(filter.Not(filter.HasNodeID(newID.NodeID)))) + suite.assertConnected(newNode.libp2pNode, suite.testNodes.libp2pNodes()) // assert that the removed node has no connections with any of the other nodes - suite.assertDisconnected(removedMiddleware, remainingIDs) + suite.assertDisconnected(removedNode.libp2pNode, suite.testNodes.libp2pNodes()) // check that all remaining engines can still talk to each other while the ones removed can't // using any of the three networking primitives @@ -317,13 +324,17 @@ func (suite *MutableIdentityTableSuite) TestNodesAddedAndRemoved() { // assertConnected checks that the middleware of a node is directly connected // to at least half of the other nodes. -func (suite *MutableIdentityTableSuite) assertConnected(mw network.Middleware, ids flow.IdentityList) { +func (suite *MutableIdentityTableSuite) assertConnected(thisNode p2p.LibP2PNode, allNodes []p2p.LibP2PNode) { t := suite.T() - threshold := len(ids) / 2 + threshold := len(allNodes) / 2 require.Eventuallyf(t, func() bool { connections := 0 - for _, id := range ids { - connected, err := mw.IsConnected(id.NodeID) + for _, node := range allNodes { + if node == thisNode { + // we don't want to check if a node is connected to itself + continue + } + connected, err := thisNode.IsConnected(node.Host().ID()) require.NoError(t, err) if connected { connections++ @@ -339,11 +350,11 @@ func (suite *MutableIdentityTableSuite) assertConnected(mw network.Middleware, i // assertDisconnected checks that the middleware of a node is not connected to any of the other nodes specified in the // ids list -func (suite *MutableIdentityTableSuite) assertDisconnected(mw network.Middleware, ids flow.IdentityList) { +func (suite *MutableIdentityTableSuite) assertDisconnected(thisNode p2p.LibP2PNode, allNodes []p2p.LibP2PNode) { t := suite.T() require.Eventuallyf(t, func() bool { - for _, id := range ids { - connected, err := mw.IsConnected(id.NodeID) + for _, node := range allNodes { + connected, err := thisNode.IsConnected(node.Host().ID()) require.NoError(t, err) if connected { return false diff --git a/network/test/meshengine_test.go b/network/test/meshengine_test.go index bb519ccb143..edf8222c223 100644 --- a/network/test/meshengine_test.go +++ b/network/test/meshengine_test.go @@ -119,7 +119,7 @@ func (suite *MeshEngineTestSuite) SetupTest() { libP2PNodes, testutils.MiddlewareConfigFixture(suite.T(), sporkId), mocknetwork.NewViolationsConsumer(suite.T())) - suite.nets = testutils.NetworksFixture(suite.T(), suite.ids, suite.mws) + suite.nets = testutils.NetworksFixture(suite.T(), sporkId, suite.ids, suite.mws) testutils.StartNodesAndNetworks(signalerCtx, suite.T(), libP2PNodes, suite.nets, 100*time.Millisecond) for _, observableConnMgr := range tagObservables { diff --git a/network/test/middleware_test.go b/network/test/middleware_test.go index 1f3311a2da0..bf46249782e 100644 --- a/network/test/middleware_test.go +++ b/network/test/middleware_test.go @@ -42,10 +42,6 @@ import ( "github.com/onflow/flow-go/utils/unittest" ) -const testChannel = channels.TestNetworkChannel - -// tagsObserver is used to observe peer connections and their tags within a GossipSub mesh. -// It plays a crucial role in verifying the formation of a mesh within unit tests. // libp2p emits a call to `Protect` with a topic-specific tag upon establishing each peering connection in a GossipSUb mesh, see: // https://github.com/libp2p/go-libp2p-pubsub/blob/master/tag_tracer.go // One way to make sure such a mesh has formed, asynchronously, in unit tests, is to wait for libp2p.GossipSubD such calls, @@ -189,7 +185,7 @@ func (m *MiddlewareTestSuite) SetupTest() { mw.SetOverlay(m.ov[i]) mw.Start(m.mwCtx) unittest.RequireComponentsReadyBefore(m.T(), 100*time.Millisecond, mw) - require.NoError(m.T(), mw.Subscribe(testChannel)) + require.NoError(m.T(), mw.Subscribe(channels.TestNetworkChannel)) } } @@ -243,9 +239,9 @@ func (m *MiddlewareTestSuite) TestUpdateNodeAddresses() { // needed to enable ID translation m.providers[0].SetIdentities(idList) - outMsg, err := network.NewOutgoingScope( + outMsg, err := message.NewOutgoingScope( flow.IdentifierList{newId.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: "TestUpdateNodeAddresses", }, @@ -334,7 +330,7 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Messages() { calls := atomic.NewUint64(0) ch := make(chan struct{}) - overlay.On("Receive", mockery.AnythingOfType("*network.IncomingMessageScope")).Return(nil).Run(func(args mockery.Arguments) { + overlay.On("Receive", mockery.AnythingOfType("*message.IncomingMessageScope")).Return(nil).Run(func(args mockery.Arguments) { calls.Inc() if calls.Load() >= 5 { close(ch) @@ -352,7 +348,7 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Messages() { newMw.Start(irrecoverableCtx) unittest.RequireComponentsReadyBefore(m.T(), 100*time.Millisecond, newMw) - require.NoError(m.T(), newMw.Subscribe(testChannel)) + require.NoError(m.T(), newMw.Subscribe(channels.TestNetworkChannel)) // needed to enable ID translation m.providers[0].SetIdentities(idList) @@ -371,9 +367,9 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Messages() { // to be invoked at-least once. We send 10 messages due to the flakiness that is caused by async stream // handling of streams. for i := 0; i < 10; i++ { - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{newId.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: fmt.Sprintf("hello-%d", i), }, @@ -395,9 +391,9 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Messages() { // eventually the rate limited node should be able to reconnect and send messages require.Eventually(m.T(), func() bool { - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{newId.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: "hello", }, @@ -494,7 +490,7 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Bandwidth() { newMw.Start(irrecoverableCtx) unittest.RequireComponentsReadyBefore(m.T(), 100*time.Millisecond, newMw) - require.NoError(m.T(), newMw.Subscribe(testChannel)) + require.NoError(m.T(), newMw.Subscribe(channels.TestNetworkChannel)) idList := flow.IdentityList(append(m.ids, newId)) @@ -507,9 +503,9 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Bandwidth() { b[i] = byte('X') } - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{newId.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: string(b), }, @@ -545,9 +541,9 @@ func (m *MiddlewareTestSuite) TestUnicastRateLimit_Bandwidth() { // eventually the rate limited node should be able to reconnect and send messages require.Eventually(m.T(), func() bool { - msg, err = network.NewOutgoingScope( + msg, err = message.NewOutgoingScope( flow.IdentifierList{newId.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: "", }, @@ -607,9 +603,9 @@ func (m *MiddlewareTestSuite) TestPing() { lastNodeIndex := m.size - 1 expectedPayload := "TestPingContentReception" - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{m.ids[lastNodeIndex].NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: expectedPayload, }, @@ -621,10 +617,10 @@ func (m *MiddlewareTestSuite) TestPing() { Run(func(args mockery.Arguments) { receiveWG.Done() - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(m.T(), ok) - require.Equal(m.T(), testChannel, msg.Channel()) // channel + require.Equal(m.T(), channels.TestNetworkChannel, msg.Channel()) // channel require.Equal(m.T(), m.ids[firstNodeIndex].NodeID, msg.OriginId()) // sender id require.Equal(m.T(), m.ids[lastNodeIndex].NodeID, msg.TargetIDs()[0]) // target id require.Equal(m.T(), message.ProtocolTypeUnicast, msg.Protocol()) // protocol @@ -665,9 +661,9 @@ func (m *MiddlewareTestSuite) MultiPing(count int) { sendWG.Add(1) expectedPayloadText := fmt.Sprintf("hello from: %d", i) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{m.ids[lastNodeIndex].NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: expectedPayloadText, }, @@ -679,10 +675,10 @@ func (m *MiddlewareTestSuite) MultiPing(count int) { Run(func(args mockery.Arguments) { receiveWG.Done() - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(m.T(), ok) - require.Equal(m.T(), testChannel, msg.Channel()) // channel + require.Equal(m.T(), channels.TestNetworkChannel, msg.Channel()) // channel require.Equal(m.T(), m.ids[firstNodeIndex].NodeID, msg.OriginId()) // sender id require.Equal(m.T(), m.ids[lastNodeIndex].NodeID, msg.TargetIDs()[0]) // target id require.Equal(m.T(), message.ProtocolTypeUnicast, msg.Protocol()) // protocol @@ -728,9 +724,9 @@ func (m *MiddlewareTestSuite) TestEcho() { // message sent from first node to the last node. expectedSendMsg := "TestEcho" - sendMsg, err := network.NewOutgoingScope( + sendMsg, err := message.NewOutgoingScope( flow.IdentifierList{lastNode}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: expectedSendMsg, }, @@ -740,9 +736,9 @@ func (m *MiddlewareTestSuite) TestEcho() { // reply from last node to the first node. expectedReplyMsg := "TestEcho response" - replyMsg, err := network.NewOutgoingScope( + replyMsg, err := message.NewOutgoingScope( flow.IdentifierList{firstNode}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: expectedReplyMsg, }, @@ -756,16 +752,16 @@ func (m *MiddlewareTestSuite) TestEcho() { wg.Done() // sanity checks the message content. - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(m.T(), ok) - require.Equal(m.T(), testChannel, msg.Channel()) // channel + require.Equal(m.T(), channels.TestNetworkChannel, msg.Channel()) // channel require.Equal(m.T(), m.ids[first].NodeID, msg.OriginId()) // sender id require.Equal(m.T(), lastNode, msg.TargetIDs()[0]) // target id require.Equal(m.T(), message.ProtocolTypeUnicast, msg.Protocol()) // protocol require.Equal(m.T(), expectedSendMsg, msg.DecodedPayload().(*libp2pmessage.TestMessage).Text) // payload // event id - eventId, err := network.EventId(msg.Channel(), msg.Proto().Payload) + eventId, err := message.EventId(msg.Channel(), msg.Proto().Payload) require.NoError(m.T(), err) require.True(m.T(), bytes.Equal(eventId, msg.EventID())) @@ -779,16 +775,16 @@ func (m *MiddlewareTestSuite) TestEcho() { Run(func(args mockery.Arguments) { wg.Done() // sanity checks the message content. - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(m.T(), ok) - require.Equal(m.T(), testChannel, msg.Channel()) // channel + require.Equal(m.T(), channels.TestNetworkChannel, msg.Channel()) // channel require.Equal(m.T(), m.ids[last].NodeID, msg.OriginId()) // sender id require.Equal(m.T(), firstNode, msg.TargetIDs()[0]) // target id require.Equal(m.T(), message.ProtocolTypeUnicast, msg.Protocol()) // protocol require.Equal(m.T(), expectedReplyMsg, msg.DecodedPayload().(*libp2pmessage.TestMessage).Text) // payload // event id - eventId, err := network.EventId(msg.Channel(), msg.Proto().Payload) + eventId, err := message.EventId(msg.Channel(), msg.Proto().Payload) require.NoError(m.T(), err) require.True(m.T(), bytes.Equal(eventId, msg.EventID())) }) @@ -822,9 +818,9 @@ func (m *MiddlewareTestSuite) TestMaxMessageSize_SendDirect() { Text: string(payload), } - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{lastNode}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), event, unittest.NetworkCodec().Encode, message.ProtocolTypeUnicast) @@ -850,9 +846,9 @@ func (m *MiddlewareTestSuite) TestLargeMessageSize_SendDirect() { targetSize := uint64(middleware.DefaultMaxUnicastMsgSize) + 1000 event := unittest.ChunkDataResponseMsgFixture(unittest.IdentifierFixture(), unittest.WithApproximateSize(targetSize)) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{targetNode}, - channels.ProvideChunks, + channels.TopicFromChannel(channels.ProvideChunks, m.sporkId), event, unittest.NetworkCodec().Encode, message.ProtocolTypeUnicast) @@ -862,7 +858,7 @@ func (m *MiddlewareTestSuite) TestLargeMessageSize_SendDirect() { ch := make(chan struct{}) m.ov[targetIndex].On("Receive", mockery.Anything).Return(nil).Once(). Run(func(args mockery.Arguments) { - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(m.T(), ok) require.Equal(m.T(), channels.ProvideChunks, msg.Channel()) @@ -870,7 +866,7 @@ func (m *MiddlewareTestSuite) TestLargeMessageSize_SendDirect() { require.Equal(m.T(), targetNode, msg.TargetIDs()[0]) require.Equal(m.T(), message.ProtocolTypeUnicast, msg.Protocol()) - eventId, err := network.EventId(msg.Channel(), msg.Proto().Payload) + eventId, err := message.EventId(msg.Channel(), msg.Proto().Payload) require.NoError(m.T(), err) require.True(m.T(), bytes.Equal(eventId, msg.EventID())) close(ch) @@ -903,9 +899,9 @@ func (m *MiddlewareTestSuite) TestMaxMessageSize_Publish() { event := &libp2pmessage.TestMessage{ Text: string(payload), } - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{lastNode}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), event, unittest.NetworkCodec().Encode, message.ProtocolTypePubSub) @@ -938,9 +934,9 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() { <-msgRcvd } - message1, err := network.NewOutgoingScope( + message1, err := message.NewOutgoingScope( flow.IdentifierList{lastNode}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: string("hello1"), }, @@ -949,7 +945,7 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() { require.NoError(m.T(), err) m.ov[last].On("Receive", mockery.Anything).Return(nil).Run(func(args mockery.Arguments) { - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(m.T(), ok) require.Equal(m.T(), firstNode, msg.OriginId()) msgRcvd <- struct{}{} @@ -962,13 +958,13 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() { unittest.RequireReturnsBefore(m.T(), msgRcvdFun, 2*time.Second, "message not received") // now unsubscribe the target node from the channel - err = m.mws[last].Unsubscribe(testChannel) + err = m.mws[last].Unsubscribe(channels.TestNetworkChannel) assert.NoError(m.T(), err) // create and send a new message on the channel from the origin node - message2, err := network.NewOutgoingScope( + message2, err := message.NewOutgoingScope( flow.IdentifierList{lastNode}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, m.sporkId), &libp2pmessage.TestMessage{ Text: string("hello2"), }, diff --git a/network/test/unicast_authorization_test.go b/network/test/unicast_authorization_test.go index 976f702c9de..b2915fc1bd8 100644 --- a/network/test/unicast_authorization_test.go +++ b/network/test/unicast_authorization_test.go @@ -48,7 +48,8 @@ type UnicastAuthorizationTestSuite struct { // providers id providers generated at beginning of a test run providers []*unittest.UpdatableIDProvider // cancel is the cancel func from the context that was used to start the middlewares in a test run - cancel context.CancelFunc + cancel context.CancelFunc + sporkId flow.Identifier // waitCh is the channel used to wait for the middleware to perform authorization and invoke the slashing //violation's consumer before making mock assertions and cleaning up resources waitCh chan struct{} @@ -73,9 +74,9 @@ func (u *UnicastAuthorizationTestSuite) TearDownTest() { // setupMiddlewaresAndProviders will setup 2 middlewares that will be used as a sender and receiver in each suite test. func (u *UnicastAuthorizationTestSuite) setupMiddlewaresAndProviders(slashingViolationsConsumer network.ViolationsConsumer) { - sporkId := unittest.IdentifierFixture() - ids, libP2PNodes := testutils.LibP2PNodeForMiddlewareFixture(u.T(), sporkId, 2) - cfg := testutils.MiddlewareConfigFixture(u.T(), sporkId) + u.sporkId = unittest.IdentifierFixture() + ids, libP2PNodes := testutils.LibP2PNodeForMiddlewareFixture(u.T(), u.sporkId, 2) + cfg := testutils.MiddlewareConfigFixture(u.T(), u.sporkId) mws, providers := testutils.MiddlewareFixtures(u.T(), ids, libP2PNodes, cfg, slashingViolationsConsumer) require.Len(u.T(), ids, 2) require.Len(u.T(), providers, 2) @@ -154,12 +155,12 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_UnstakedPeer() u.startMiddlewares(overlay) - require.NoError(u.T(), u.receiverMW.Subscribe(testChannel)) - require.NoError(u.T(), u.senderMW.Subscribe(testChannel)) + require.NoError(u.T(), u.receiverMW.Subscribe(channels.TestNetworkChannel)) + require.NoError(u.T(), u.senderMW.Subscribe(channels.TestNetworkChannel)) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, u.sporkId), &libp2pmessage.TestMessage{ Text: string("hello"), }, @@ -216,12 +217,12 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_EjectedPeer() { u.startMiddlewares(overlay) - require.NoError(u.T(), u.receiverMW.Subscribe(testChannel)) - require.NoError(u.T(), u.senderMW.Subscribe(testChannel)) + require.NoError(u.T(), u.receiverMW.Subscribe(channels.TestNetworkChannel)) + require.NoError(u.T(), u.senderMW.Subscribe(channels.TestNetworkChannel)) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, u.sporkId), &libp2pmessage.TestMessage{ Text: string("hello"), }, @@ -280,9 +281,9 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_UnauthorizedPee require.NoError(u.T(), u.receiverMW.Subscribe(channel)) require.NoError(u.T(), u.senderMW.Subscribe(channel)) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - channel, + channels.TopicFromChannel(channels.ConsensusCommittee, u.sporkId), &libp2pmessage.TestMessage{ Text: string("hello"), }, @@ -340,12 +341,12 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_UnknownMsgCode( u.startMiddlewares(overlay) - require.NoError(u.T(), u.receiverMW.Subscribe(testChannel)) - require.NoError(u.T(), u.senderMW.Subscribe(testChannel)) + require.NoError(u.T(), u.receiverMW.Subscribe(channels.TestNetworkChannel)) + require.NoError(u.T(), u.senderMW.Subscribe(channels.TestNetworkChannel)) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, u.sporkId), &libp2pmessage.TestMessage{ Text: "hello", }, @@ -410,12 +411,12 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_WrongMsgCode() u.startMiddlewares(overlay) - require.NoError(u.T(), u.receiverMW.Subscribe(testChannel)) - require.NoError(u.T(), u.senderMW.Subscribe(testChannel)) + require.NoError(u.T(), u.receiverMW.Subscribe(channels.TestNetworkChannel)) + require.NoError(u.T(), u.senderMW.Subscribe(channels.TestNetworkChannel)) - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, u.sporkId), &libp2pmessage.TestMessage{ Text: "hello", }, @@ -445,9 +446,9 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_PublicChannel() u.setupMiddlewaresAndProviders(slashingViolationsConsumer) expectedPayload := "hello" - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - testChannel, + channels.TopicFromChannel(channels.TestNetworkChannel, u.sporkId), &libp2pmessage.TestMessage{ Text: expectedPayload, }, @@ -470,10 +471,10 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_PublicChannel() Run(func(args mockery.Arguments) { close(u.waitCh) - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(u.T(), ok) - require.Equal(u.T(), testChannel, msg.Channel()) // channel + require.Equal(u.T(), channels.TestNetworkChannel, msg.Channel()) // channel require.Equal(u.T(), u.senderID.NodeID, msg.OriginId()) // sender id require.Equal(u.T(), u.receiverID.NodeID, msg.TargetIDs()[0]) // target id require.Equal(u.T(), message.ProtocolTypeUnicast, msg.Protocol()) // protocol @@ -482,8 +483,8 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_PublicChannel() u.startMiddlewares(overlay) - require.NoError(u.T(), u.receiverMW.Subscribe(testChannel)) - require.NoError(u.T(), u.senderMW.Subscribe(testChannel)) + require.NoError(u.T(), u.receiverMW.Subscribe(channels.TestNetworkChannel)) + require.NoError(u.T(), u.senderMW.Subscribe(channels.TestNetworkChannel)) // send message via unicast err = u.senderMW.SendDirect(msg) @@ -543,9 +544,9 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_UnauthorizedUni // messages.BlockProposal is not authorized to be sent via unicast over the ConsensusCommittee channel payload := unittest.ProposalFixture() - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - channel, + channels.TopicFromChannel(channel, u.sporkId), payload, unittest.NetworkCodec().Encode, message.ProtocolTypeUnicast) @@ -600,9 +601,9 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_ReceiverHasNoSu channel := channels.TestNetworkChannel - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - channel, + channels.TopicFromChannel(channel, u.sporkId), &libp2pmessage.TestMessage{ Text: "TestUnicastAuthorization_ReceiverHasNoSubscription", }, @@ -626,9 +627,9 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_ReceiverHasSubs u.setupMiddlewaresAndProviders(slashingViolationsConsumer) channel := channels.RequestReceiptsByBlockID - msg, err := network.NewOutgoingScope( + msg, err := message.NewOutgoingScope( flow.IdentifierList{u.receiverID.NodeID}, - channel, + channels.TopicFromChannel(channel, u.sporkId), &messages.EntityRequest{}, unittest.NetworkCodec().Encode, message.ProtocolTypeUnicast) @@ -652,7 +653,7 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_ReceiverHasSubs Run(func(args mockery.Arguments) { close(u.waitCh) - msg, ok := args[0].(*network.IncomingMessageScope) + msg, ok := args[0].(network.IncomingMessageScope) require.True(u.T(), ok) require.Equal(u.T(), channel, msg.Channel()) // channel diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index e69a263ce6d..288494a2f99 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -15,6 +15,7 @@ import ( "github.com/onflow/cadence" sdk "github.com/onflow/flow-go-sdk" + "github.com/onflow/flow-go/network/message" hotstuff "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/crypto" @@ -40,7 +41,6 @@ import ( "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/module/signature" "github.com/onflow/flow-go/module/updatable_configs" - "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/p2p/keyutils" "github.com/onflow/flow-go/state/protocol" @@ -2422,7 +2422,7 @@ func GetFlowProtocolEventID( ) flow.Identifier { payload, err := NetworkCodec().Encode(event) require.NoError(t, err) - eventIDHash, err := network.EventId(channel, payload) + eventIDHash, err := message.EventId(channel, payload) require.NoError(t, err) return flow.HashToID(eventIDHash) }