Skip to content

Commit

Permalink
Merge pull request #5391 from onflow/khalil/6934-invalid-topicid-thre…
Browse files Browse the repository at this point in the history
…shold

[Networking] Enhance Gossipsub Resilience: Configurable Threshold for Invalid Topic IDs in Control Messages
  • Loading branch information
kc1116 authored Feb 23, 2024
2 parents 514461a + 3e393bb commit 8f4ecc4
Show file tree
Hide file tree
Showing 15 changed files with 570 additions and 136 deletions.
6 changes: 6 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ network-config:
# to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues.
# A topic id is considered duplicate if it appears more than once in a single GRAFT or PRUNE message.
duplicate-topic-id-threshold: 50
# Maximum number of total invalid topic ids in GRAFTs/PRUNEs of a single RPC, ideally this should be 0 but we allow for some tolerance
# to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. Exceeding this threshold causes RPC inspection failure with an invalid control message notification (penalty).
invalid-topic-id-threshold: 50
ihave:
# The maximum allowed number of iHave messages in a single RPC message.
# Each iHave message represents the list of message ids. When the total number of iHave messages
Expand All @@ -181,6 +184,9 @@ network-config:
# Ideally, an iHave message should not have any duplicate message IDs, hence a message id is considered duplicate when it is repeated more than once
# within the same iHave message. When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail.
duplicate-message-id-threshold: 100
# Maximum number of total invalid topic ids in an IHAVE message on a single RPC, ideally this should be 0 but we allow for some tolerance
# to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues. Exceeding this threshold causes RPC inspection failure with an invalid control message notification (penalty).
invalid-topic-id-threshold: 50
iwant:
# The maximum allowed number of iWant messages in a single RPC message.
# Each iWant message represents the list of message ids. When the total number of iWant messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestValidationInspector_InvalidTopicId_Detection(t *testing.T) {
require.True(t, ok)
require.Equal(t, notification.TopicType, p2p.CtrlMsgNonClusterTopicType, "IsClusterPrefixed is expected to be false, no RPC with cluster prefixed topic sent in this test")
require.Equal(t, spammer.SpammerNode.ID(), notification.PeerID)
require.True(t, channels.IsInvalidTopicErr(notification.Error))
require.True(t, validation.IsInvalidTopicIDThresholdExceeded(notification.Error))
switch notification.MsgType {
case p2pmsg.CtrlMsgGraft:
invGraftNotifCount.Inc()
Expand Down
21 changes: 18 additions & 3 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,17 @@ type GossipSubRpcValidationInspectorMetrics interface {
//
// duplicateTopicIds: the total number of duplicate topic ids received by the node on the iHave messages at the end of the async inspection of the RPC.
// duplicateMessageIds: the number of duplicate message ids received by the node on the iHave messages at the end of the async inspection of the RPC.
OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int)
// invalidTopicIds: the number of invalid message ids received by the node on the iHave messages at the end of the async inspection of the RPC.
OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds, invalidTopicIds int)

// OnIHaveDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate topic ids
// received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report.
OnIHaveDuplicateTopicIdsExceedThreshold()

// OnIHaveInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of invalid topic ids
// received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report.
OnIHaveInvalidTopicIdsExceedThreshold()

// OnIHaveDuplicateMessageIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate message ids
// received by the node on an iHave message exceeding the threshold, which results in a misbehaviour report.
OnIHaveDuplicateMessageIdsExceedThreshold()
Expand Down Expand Up @@ -343,19 +348,29 @@ type GossipSubRpcValidationInspectorMetrics interface {
// received by the node on prune messages of the same RPC excesses threshold, which results in a misbehaviour report.
OnPruneDuplicateTopicIdsExceedThreshold()

// OnPruneInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of prune messages for an RPC failed due to the number of invalid topic ids
// received by the node on prune messages of the same RPC excesses threshold, which results in a misbehaviour report.
OnPruneInvalidTopicIdsExceedThreshold()

// OnPruneMessageInspected is called at the end of the async inspection of prune messages of the RPC, regardless of the result of the inspection.
// Args:
// duplicateTopicIds: the number of duplicate topic ids received by the node on the prune messages of the RPC at the end of the async inspection prunes.
OnPruneMessageInspected(duplicateTopicIds int)
// invalidTopicIds: the number of invalid topic ids received by the node on the prune messages at the end of the async inspection of a single RPC.
OnPruneMessageInspected(duplicateTopicIds, invalidTopicIds int)

// OnGraftDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of the graft messages of a single RPC failed due to the number of duplicate topic ids
// received by the node on graft messages of the same RPC excesses threshold, which results in a misbehaviour report.
OnGraftDuplicateTopicIdsExceedThreshold()

// OnGraftInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of the graft messages of a single RPC failed due to the number of invalid topic ids
// received by the node on graft messages of the same RPC excesses threshold, which results in a misbehaviour report.
OnGraftInvalidTopicIdsExceedThreshold()

// OnGraftMessageInspected is called at the end of the async inspection of graft messages of a single RPC, regardless of the result of the inspection.
// Args:
// duplicateTopicIds: the number of duplicate topic ids received by the node on the graft messages at the end of the async inspection of a single RPC.
OnGraftMessageInspected(duplicateTopicIds int)
// invalidTopicIds: the number of invalid topic ids received by the node on the graft messages at the end of the async inspection of a single RPC.
OnGraftMessageInspected(duplicateTopicIds, invalidTopicIds int)

// OnPublishMessageInspected is called at the end of the async inspection of publish messages of a single RPC, regardless of the result of the inspection.
// It tracks the total number of errors detected during the async inspection of the rpc together with their individual breakdown.
Expand Down
83 changes: 79 additions & 4 deletions module/metrics/gossipsub_rpc_validation_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,23 @@ type GossipSubRpcValidationInspectorMetrics struct {

// graft inspection
graftDuplicateTopicIdsHistogram prometheus.Histogram
graftInvalidTopicIdsHistogram prometheus.Histogram
graftDuplicateTopicIdsExceedThresholdCount prometheus.Counter
graftInvalidTopicIdsExceedThresholdCount prometheus.Counter

// prune inspection
pruneDuplicateTopicIdsHistogram prometheus.Histogram
pruneInvalidTopicIdsHistogram prometheus.Histogram
pruneDuplicateTopicIdsExceedThresholdCount prometheus.Counter
pruneInvalidTopicIdsExceedThresholdCount prometheus.Counter

// iHave inspection
iHaveDuplicateMessageIdHistogram prometheus.Histogram
iHaveDuplicateTopicIdHistogram prometheus.Histogram
iHaveInvalidTopicIdHistogram prometheus.Histogram
iHaveDuplicateMessageIdExceedThresholdCount prometheus.Counter
iHaveDuplicateTopicIdExceedThresholdCount prometheus.Counter
iHaveInvalidTopicIdExceedThresholdCount prometheus.Counter

// iWant inspection
iWantDuplicateMessageIdHistogram prometheus.Histogram
Expand Down Expand Up @@ -167,6 +173,14 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid
Help: "number of duplicate topic ids received from gossipsub protocol during the async inspection of a single RPC",
})

gc.iHaveInvalidTopicIdHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Buckets: []float64{1, 100, 1000},
Name: gc.prefix + "rpc_inspection_ihave_invalid_topic_ids_count",
Help: "number of invalid topic ids received from gossipsub protocol during the async inspection of a single RPC",
})

gc.iHaveDuplicateMessageIdExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Expand All @@ -181,6 +195,13 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid
Help: "total number of times that the async inspection of iHave messages failed due to the number of duplicate topic ids exceeding the threshold",
})

gc.iHaveInvalidTopicIdExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Name: gc.prefix + "rpc_inspection_ihave_invalid_topic_ids_exceed_threshold_total",
Help: "total number of times that the async inspection of iHave messages failed due to the number of invalid topic ids exceeding the threshold",
})

gc.iWantDuplicateMessageIdHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Expand Down Expand Up @@ -247,13 +268,28 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid
Help: "number of duplicate topic ids on graft messages of a single RPC during the async inspection, regardless of the result of the inspection",
})

gc.graftInvalidTopicIdsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Name: gc.prefix + "rpc_inspection_graft_invalid_topic_ids_count",
Buckets: []float64{1, 100, 1000},
Help: "number of invalid topic ids on graft messages of a single RPC during the async inspection, regardless of the result of the inspection",
})

gc.graftDuplicateTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Name: gc.prefix + "rpc_inspection_graft_duplicate_topic_ids_exceed_threshold_total",
Help: "number of times that the async inspection of graft messages of an rpc failed due to the number of duplicate topic ids exceeding the threshold",
})

gc.graftInvalidTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Name: gc.prefix + "rpc_inspection_graft_invalid_topic_ids_exceed_threshold_total",
Help: "number of times that the async inspection of graft messages of an rpc failed due to the number of invalid topic ids exceeding the threshold",
})

gc.pruneDuplicateTopicIdsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Expand All @@ -262,13 +298,28 @@ func NewGossipSubRPCValidationInspectorMetrics(prefix string) *GossipSubRpcValid
Help: "number of duplicate topic ids on prune messages of a single RPC during the async inspection, regardless of the result of the inspection",
})

gc.pruneInvalidTopicIdsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Buckets: []float64{1, 100, 1000},
Name: gc.prefix + "rpc_inspection_prune_invalid_topic_ids_count",
Help: "number of invalid topic ids on prune messages of a single RPC during the async inspection, regardless of the result of the inspection",
})

gc.pruneDuplicateTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Name: gc.prefix + "rpc_inspection_prune_duplicate_topic_ids_exceed_threshold_total",
Help: "number of times that the async inspection of prune messages failed due to the number of duplicate topic ids exceeding the threshold",
})

gc.pruneInvalidTopicIdsExceedThresholdCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Name: gc.prefix + "rpc_inspection_prune_invalid_topic_ids_exceed_threshold_total",
Help: "number of times that the async inspection of prune messages failed due to the number of invalid topic ids exceeding the threshold",
})

gc.publishMessageInspectedErrHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceNetwork,
Subsystem: subsystemGossip,
Expand Down Expand Up @@ -323,7 +374,7 @@ func (c *GossipSubRpcValidationInspectorMetrics) AsyncProcessingFinished(duratio
c.rpcCtrlMsgAsyncProcessingTimeHistogram.Observe(duration.Seconds())
}

// OnControlMessageIDsTruncated tracks the number of times a control message was truncated.
// OnControlMessagesTruncated tracks the number of times a control message was truncated.
// Args:
//
// messageType: the type of the control message that was truncated
Expand Down Expand Up @@ -407,9 +458,11 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIWantCacheMissMessageIdsExcee
//
// duplicateTopicIds: the total number of duplicate topic ids received by the node on the iHave messages at the end of the async inspection of the RPC.
// duplicateMessageIds: the number of duplicate message ids received by the node on the iHave messages at the end of the async inspection of the RPC.
func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessagesInspected(duplicateTopicIds int, duplicateMessageIds int) {
// invalidTopicIds: the number of invalid message ids received by the node on the iHave messages at the end of the async inspection of the RPC.
func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveMessagesInspected(duplicateTopicIds, duplicateMessageIds, invalidTopicIds int) {
c.iHaveDuplicateTopicIdHistogram.Observe(float64(duplicateTopicIds))
c.iHaveDuplicateMessageIdHistogram.Observe(float64(duplicateMessageIds))
c.iHaveInvalidTopicIdHistogram.Observe(float64(invalidTopicIds))
}

// OnIHaveDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of duplicate topic ids
Expand All @@ -424,6 +477,12 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveDuplicateMessageIdsExcee
c.iHaveDuplicateMessageIdExceedThresholdCount.Inc()
}

// OnIHaveInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of iHave messages of a single RPC failed due to the total number of invalid topic ids
// received by the node on the iHave messages of that RPC exceeding the threshold, which results in a misbehaviour report.
func (c *GossipSubRpcValidationInspectorMetrics) OnIHaveInvalidTopicIdsExceedThreshold() {
c.iHaveInvalidTopicIdExceedThresholdCount.Inc()
}

// OnInvalidTopicIdDetectedForControlMessage tracks the number of times that the async inspection of a control message type on a single RPC failed due to an invalid topic id.
// Args:
// - messageType: the type of the control message that was truncated.
Expand Down Expand Up @@ -455,12 +514,20 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnPruneDuplicateTopicIdsExceedT
c.pruneDuplicateTopicIdsExceedThresholdCount.Inc()
}

// OnPruneInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of prune messages for an RPC failed due to the number of invalid topic ids
// received by the node on prune messages of the same RPC excesses threshold, which results in a misbehaviour report.
func (c *GossipSubRpcValidationInspectorMetrics) OnPruneInvalidTopicIdsExceedThreshold() {
c.pruneInvalidTopicIdsExceedThresholdCount.Inc()
}

// OnPruneMessageInspected is called at the end of the async inspection of prune messages of the RPC, regardless of the result of the inspection.
// Args:
//
// duplicateTopicIds: the number of duplicate topic ids received by the node on the prune messages of the RPC at the end of the async inspection prunes.
func (c *GossipSubRpcValidationInspectorMetrics) OnPruneMessageInspected(duplicateTopicIds int) {
// invalidTopicIds: the number of invalid message ids received by the node on the prune messages at the end of the async inspection of the RPC.
func (c *GossipSubRpcValidationInspectorMetrics) OnPruneMessageInspected(duplicateTopicIds, invalidTopicIds int) {
c.pruneDuplicateTopicIdsHistogram.Observe(float64(duplicateTopicIds))
c.pruneInvalidTopicIdsHistogram.Observe(float64(invalidTopicIds))
}

// OnGraftDuplicateTopicIdsExceedThreshold tracks the number of times that the async inspection of a graft message failed due to the number of duplicate topic ids.
Expand All @@ -469,12 +536,20 @@ func (c *GossipSubRpcValidationInspectorMetrics) OnGraftDuplicateTopicIdsExceedT
c.graftDuplicateTopicIdsExceedThresholdCount.Inc()
}

// OnGraftInvalidTopicIdsExceedThreshold tracks the number of times that the async inspection of the graft messages of a single RPC failed due to the number of invalid topic ids
// received by the node on graft messages of the same RPC excesses threshold, which results in a misbehaviour report.
func (c *GossipSubRpcValidationInspectorMetrics) OnGraftInvalidTopicIdsExceedThreshold() {
c.graftInvalidTopicIdsExceedThresholdCount.Inc()
}

// OnGraftMessageInspected is called at the end of the async inspection of graft messages of a single RPC, regardless of the result of the inspection.
// Args:
//
// duplicateTopicIds: the number of duplicate topic ids received by the node on the graft messages at the end of the async inspection of a single RPC.
func (c *GossipSubRpcValidationInspectorMetrics) OnGraftMessageInspected(duplicateTopicIds int) {
// invalidTopicIds: the number of invalid message ids received by the node on the graft messages at the end of the async inspection of the RPC.
func (c *GossipSubRpcValidationInspectorMetrics) OnGraftMessageInspected(duplicateTopicIds, invalidTopicIds int) {
c.graftDuplicateTopicIdsHistogram.Observe(float64(duplicateTopicIds))
c.graftInvalidTopicIdsHistogram.Observe(float64(invalidTopicIds))
}

// OnPublishMessageInspected is called at the end of the async inspection of publish messages of a single RPC, regardless of the result of the inspection.
Expand Down
Loading

0 comments on commit 8f4ecc4

Please sign in to comment.