Skip to content

Commit

Permalink
Merge branch 'master' into tarak/go1.20
Browse files Browse the repository at this point in the history
  • Loading branch information
tarakby authored Jul 19, 2023
2 parents a97a02a + 9682381 commit e67bcc9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 7 deletions.
3 changes: 1 addition & 2 deletions network/alsp/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (m *MisbehaviorReportManager) HandleMisbehaviorReport(channel channels.Chan
Hex("misbehaving_id", logging.ID(report.OriginId())).
Str("reason", report.Reason().String()).
Float64("penalty", report.Penalty()).Logger()
lg.Trace().Msg("received misbehavior report")
m.metrics.OnMisbehaviorReported(channel.String(), report.Reason().String())

nonce := [internal.NonceSize]byte{}
Expand Down Expand Up @@ -333,7 +334,6 @@ func (m *MisbehaviorReportManager) onHeartbeat() error {
Cause: network.DisallowListedCauseAlsp, // sets the ALSP disallow listing cause on node
})
}

// each time we decay the penalty by the decay speed, the penalty is a negative number, and the decay speed
// is a positive number. So the penalty is getting closer to zero.
// We use math.Min() to make sure the penalty is never positive.
Expand Down Expand Up @@ -425,7 +425,6 @@ func (m *MisbehaviorReportManager) processMisbehaviorReport(report internal.Repo
// we should crash the node in this case to prevent further misbehavior reports from being lost and fix the bug.
return fmt.Errorf("failed to apply penalty to the spam record: %w", err)
}

lg.Debug().Float64("updated_penalty", updatedPenalty).Msg("misbehavior report handled")
return nil
}
101 changes: 97 additions & 4 deletions network/alsp/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/onflow/flow-go/network/mocknetwork"
"github.com/onflow/flow-go/network/p2p"
p2ptest "github.com/onflow/flow-go/network/p2p/test"
"github.com/onflow/flow-go/network/slashing"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -183,7 +184,7 @@ func TestHandleReportedMisbehavior_Cache_Integration(t *testing.T) {
// TestHandleReportedMisbehavior_And_DisallowListing_Integration implements an end-to-end integration test for the
// handling of reported misbehavior and disallow listing.
//
// The test sets up 3 nodes, one victim, one honest, and one (alledged) spammer.
// The test sets up 3 nodes, one victim, one honest, and one (alleged) spammer.
// Initially, the test ensures that all nodes are connected to each other.
// Then, test imitates that victim node reports the spammer node for spamming.
// The test generates enough spam reports to trigger the disallow-listing of the victim node.
Expand All @@ -196,11 +197,11 @@ func TestHandleReportedMisbehavior_And_DisallowListing_Integration(t *testing.T)
// this test is assessing the integration of the ALSP manager with the network. As the ALSP manager is an attribute
// of the network, we need to configure the ALSP manager via the network configuration, and let the network create
// the ALSP manager.
var victimSpamRecordCacheCache alsp.SpamRecordCache
var victimSpamRecordCache alsp.SpamRecordCache
cfg.Opts = []alspmgr.MisbehaviorReportManagerOption{
alspmgr.WithSpamRecordsCacheFactory(func(logger zerolog.Logger, size uint32, metrics module.HeroCacheMetrics) alsp.SpamRecordCache {
victimSpamRecordCacheCache = internal.NewSpamRecordCache(size, logger, metrics, model.SpamRecordFactory())
return victimSpamRecordCacheCache
victimSpamRecordCache = internal.NewSpamRecordCache(size, logger, metrics, model.SpamRecordFactory())
return victimSpamRecordCache
}),
}

Expand Down Expand Up @@ -266,6 +267,98 @@ func TestHandleReportedMisbehavior_And_DisallowListing_Integration(t *testing.T)
p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[spammerIndex]})
}

// TestHandleReportedMisbehavior_And_SlashingViolationsConsumer_Integration implements an end-to-end integration test for the
// handling of reported misbehavior from the slashing.ViolationsConsumer.
//
// The test sets up one victim, one honest, and one (alleged) spammer for each of the current slashing violations.
// Initially, the test ensures that all nodes are connected to each other.
// Then, test imitates the slashing violations consumer on the victim node reporting misbehavior's for each slashing violation.
// The test generates enough slashing violations to trigger the connection to each of the spamming nodes to be eventually pruned.
// The test ensures that the victim node is disconnected from all spammer nodes.
// The test ensures that despite attempting on connections, no inbound or outbound connections between the victim and
// the pruned spammer nodes are established.
func TestHandleReportedMisbehavior_And_SlashingViolationsConsumer_Integration(t *testing.T) {

// create 1 victim node, 1 honest node and a node for each slashing violation
ids, nodes, _ := testutils.LibP2PNodeForMiddlewareFixture(t, 7) // creates 7 nodes (1 victim, 1 honest, 5 spammer nodes one for each slashing violation).
mws, _ := testutils.MiddlewareFixtures(t, ids, nodes, testutils.MiddlewareConfigFixture(t), mocknetwork.NewViolationsConsumer(t))
networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0], p2p.WithAlspConfig(managerCfgFixture(t)))
victimNetwork, err := p2p.NewNetwork(networkCfg)
require.NoError(t, err)

// create slashing violations consumer with victim node network providing the network.MisbehaviorReportConsumer interface
violationsConsumer := slashing.NewSlashingViolationsConsumer(unittest.Logger(), metrics.NewNoopCollector(), victimNetwork)
mws[0].SetSlashingViolationsConsumer(violationsConsumer)

ctx, cancel := context.WithCancel(context.Background())
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
testutils.StartNodesAndNetworks(signalerCtx, t, nodes, []network.Network{victimNetwork}, 100*time.Millisecond)
defer testutils.StopComponents[p2p.LibP2PNode](t, nodes, 100*time.Millisecond)
defer cancel()

p2ptest.LetNodesDiscoverEachOther(t, ctx, nodes, ids)
// initially victim and misbehaving nodes should be able to connect to each other.
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)

// each slashing violation func is mapped to a violation with the identity of one of the misbehaving nodes
// index of the victim node in the nodes slice.
victimIndex := 0
honestNodeIndex := 1
invalidMessageIndex := 2
senderEjectedIndex := 3
unauthorizedUnicastOnChannelIndex := 4
unauthorizedPublishOnChannelIndex := 5
unknownMsgTypeIndex := 6
slashingViolationTestCases := []struct {
violationsConsumerFunc func(violation *network.Violation)
violation *network.Violation
}{
{violationsConsumer.OnUnAuthorizedSenderError, &network.Violation{Identity: ids[invalidMessageIndex]}},
{violationsConsumer.OnSenderEjectedError, &network.Violation{Identity: ids[senderEjectedIndex]}},
{violationsConsumer.OnUnauthorizedUnicastOnChannel, &network.Violation{Identity: ids[unauthorizedUnicastOnChannelIndex]}},
{violationsConsumer.OnUnauthorizedPublishOnChannel, &network.Violation{Identity: ids[unauthorizedPublishOnChannelIndex]}},
{violationsConsumer.OnUnknownMsgTypeError, &network.Violation{Identity: ids[unknownMsgTypeIndex]}},
}

violationsWg := sync.WaitGroup{}
violationCount := 120
for _, testCase := range slashingViolationTestCases {
for i := 0; i < violationCount; i++ {
testCase := testCase
violationsWg.Add(1)
go func() {
defer violationsWg.Done()
testCase.violationsConsumerFunc(testCase.violation)
}()
}
}
unittest.RequireReturnsBefore(t, violationsWg.Wait, 100*time.Millisecond, "slashing violations not reported in time")

forEachMisbehavingNode := func(f func(i int)) {
for misbehavingNodeIndex := 2; misbehavingNodeIndex <= len(nodes)-1; misbehavingNodeIndex++ {
f(misbehavingNodeIndex)
}
}

// ensures all misbehaving nodes are disconnected from the victim node
forEachMisbehavingNode(func(misbehavingNodeIndex int) {
p2ptest.RequireEventuallyNotConnected(t, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[misbehavingNodeIndex]}, 100*time.Millisecond, 2*time.Second)
})

// despite being disconnected from the victim node, misbehaving nodes and the honest node are still connected.
forEachMisbehavingNode(func(misbehavingNodeIndex int) {
p2ptest.RequireConnectedEventually(t, []p2p.LibP2PNode{nodes[honestNodeIndex], nodes[misbehavingNodeIndex]}, 1*time.Millisecond, 100*time.Millisecond)
})

// despite disconnecting misbehaving nodes, ensure that (victim and honest) are still connected.
p2ptest.RequireConnectedEventually(t, []p2p.LibP2PNode{nodes[honestNodeIndex], nodes[victimIndex]}, 1*time.Millisecond, 100*time.Millisecond)

// while misbehaving nodes are disconnected, they cannot connect to the victim node. Also, the victim node cannot directly dial and connect to the misbehaving nodes until each node's peer score decays.
forEachMisbehavingNode(func(misbehavingNodeIndex int) {
p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[misbehavingNodeIndex]})
})
}

// TestMisbehaviorReportMetrics tests the recording of misbehavior report metrics.
// It checks that when a misbehavior report is received by the ALSP manager, the metrics are recorded.
// It fails the test if the metrics are not recorded or if they are recorded incorrectly.
Expand Down
1 change: 0 additions & 1 deletion network/slashing/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (c *Consumer) reportMisbehavior(misbehavior network.Misbehavior, violation
Err(err).
Str("peerID", violation.PeerID).
Msg("failed to create misbehavior report")

}
c.misbehaviorReportConsumer.ReportMisbehaviorOnChannel(violation.Channel, report)
}
Expand Down

0 comments on commit e67bcc9

Please sign in to comment.