Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Networking] Optimizing GossipSub RPC Handling Memory Usage Through Asynchronous Subscription Updates #4988

Merged
merged 62 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
02b1d57
repackages spam record cache
yhassanzadeh13 Oct 4, 2023
086fdf8
repackages scoring
yhassanzadeh13 Oct 4, 2023
7c926e9
Revert "repackages scoring"
yhassanzadeh13 Oct 4, 2023
7a4cd44
Revert "repackages spam record cache"
yhassanzadeh13 Oct 4, 2023
ba89a09
replaces event-based topic update with regular intervals
yhassanzadeh13 Oct 4, 2023
9d3a0f3
wires subscription provider parameters to gossipsub builder
yhassanzadeh13 Oct 4, 2023
e817929
wires subscription provider parameters to libp2p builder
yhassanzadeh13 Oct 4, 2023
b162ff0
wires down the subscription config parameters
yhassanzadeh13 Oct 4, 2023
ca81493
wires flags and default configs for interval
yhassanzadeh13 Oct 4, 2023
77046a2
wires flags
yhassanzadeh13 Oct 4, 2023
2f4c179
merges master into branch
yhassanzadeh13 Oct 23, 2023
c4cd07a
lint fix
yhassanzadeh13 Oct 23, 2023
81cc007
fixes TestSubscriptionProvider_GetSubscribedTopics
yhassanzadeh13 Oct 23, 2023
6a7d5c2
wip
yhassanzadeh13 Oct 23, 2023
a832465
wip
yhassanzadeh13 Oct 23, 2023
e550b6e
implements subscription record entity
yhassanzadeh13 Oct 24, 2023
284506c
wip
yhassanzadeh13 Oct 24, 2023
22c0f89
refines subscription record cache
yhassanzadeh13 Oct 25, 2023
d1a4bf9
adds component lifecycle to subscription validator, provider and option
yhassanzadeh13 Oct 26, 2023
801640b
Merge branch 'master' into yahya/6870-fix-memory-intensive-issues-part-1
yhassanzadeh13 Nov 9, 2023
0c533a7
fixes circular dependency between GossipSub and inspector
yhassanzadeh13 Nov 9, 2023
b105b36
lint fix
yhassanzadeh13 Nov 9, 2023
594748d
updates mocks
yhassanzadeh13 Nov 9, 2023
f3a3581
build fix
yhassanzadeh13 Nov 9, 2023
3af7376
makes app-specific score registry startable
yhassanzadeh13 Nov 9, 2023
a0ff8ec
adds startup hierarchy to score option
yhassanzadeh13 Nov 9, 2023
e72cd25
fixes all tests
yhassanzadeh13 Nov 9, 2023
55cbb6e
Merge remote-tracking branch 'origin/master' into yahya/6870-fix-memo…
yhassanzadeh13 Nov 9, 2023
f253b09
adds duration to the startup log
yhassanzadeh13 Nov 10, 2023
3a86983
adds TestNewSubscriptionRecordCache
yhassanzadeh13 Nov 10, 2023
38140d8
adds TestGetSubscribedTopics
yhassanzadeh13 Nov 10, 2023
398c031
adds TestDuplicateTopics
yhassanzadeh13 Nov 10, 2023
3c41a1a
adds test move update cycle
yhassanzadeh13 Nov 10, 2023
6b60162
adds TestSubscriptionValidator_Integration
yhassanzadeh13 Nov 10, 2023
863c937
adds TestMoveUpdateCycleWithDifferentPeers
yhassanzadeh13 Nov 10, 2023
1a6d388
updates mocks
yhassanzadeh13 Nov 10, 2023
230d20f
fixes build errors in insecure package
yhassanzadeh13 Nov 10, 2023
864f009
Merge branch 'master' into yahya/6870-fix-memory-intensive-issues-part-1
yhassanzadeh13 Nov 10, 2023
b310a64
adds TestSubscriptionProvider_GetSubscribedTopics_SkippingUnknownPeers
yhassanzadeh13 Nov 10, 2023
0fca12d
Merge branch 'master' into yahya/6870-fix-memory-intensive-issues-part-1
yhassanzadeh13 Nov 14, 2023
4c73a92
fixes merge conflicts
yhassanzadeh13 Nov 14, 2023
4a473be
lint fix
yhassanzadeh13 Nov 14, 2023
0e003ca
lint fix
yhassanzadeh13 Nov 14, 2023
e13a94f
Merge branch 'master' into yahya/6870-fix-memory-intensive-issues-part-1
yhassanzadeh13 Nov 14, 2023
443bf8a
consolidates PeerIdFixtureB with PeerIdFixture
yhassanzadeh13 Nov 14, 2023
08bf30e
adds lint and tidy to insecure package
yhassanzadeh13 Nov 14, 2023
9b1a4cc
lint fix
yhassanzadeh13 Nov 14, 2023
cc27bad
Revert "lint fix"
yhassanzadeh13 Nov 14, 2023
99bc1fd
lint fix
yhassanzadeh13 Nov 14, 2023
f7d1eef
Merge branch 'master' into yahya/6870-fix-memory-intensive-issues-part-1
yhassanzadeh13 Nov 14, 2023
f34bc32
Merge remote-tracking branch 'origin/master' into yahya/6870-fix-memo…
yhassanzadeh13 Nov 15, 2023
48f255f
moving ready to the select-case
yhassanzadeh13 Nov 16, 2023
e3525d3
moving ready to select-case
yhassanzadeh13 Nov 16, 2023
a46c466
revises the error by update loop to make it irrecoverable
yhassanzadeh13 Nov 16, 2023
d83fab5
adds select case for startup of subscription provider
yhassanzadeh13 Nov 16, 2023
c0d7e07
Update network/p2p/scoring/subscription_provider.go
yhassanzadeh13 Nov 16, 2023
ddfcc9f
adds documentation to subscription record cache
yhassanzadeh13 Nov 16, 2023
a433ec1
Merge branch 'yahya/6870-fix-memory-intensive-issues-part-1' of githu…
yhassanzadeh13 Nov 16, 2023
ba7b727
Merge branch 'master' into yahya/6870-fix-memory-intensive-issues-part-1
yhassanzadeh13 Nov 16, 2023
db47e4d
fixes a bug with subscription record id
yhassanzadeh13 Nov 16, 2023
4853b4c
Merge branch 'yahya/6870-fix-memory-intensive-issues-part-1' of githu…
yhassanzadeh13 Nov 16, 2023
f8daccd
adds more documentation for current cycle
yhassanzadeh13 Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
UpdateInterval: builder.FlowConfig.NetworkConfig.PeerUpdateInterval,
ConnectorFactory: connection.DefaultLibp2pBackoffConnectorFactory(),
},
&builder.FlowConfig.NetworkConfig.GossipSubConfig.SubscriptionProviderConfig,
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand Down
1 change: 1 addition & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
&builder.FlowConfig.NetworkConfig.ResourceManager,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
p2pconfig.PeerManagerDisableConfig(), // disable peer manager for observer node.
&builder.FlowConfig.NetworkConfig.GossipSubConfig.SubscriptionProviderConfig,
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand Down
9 changes: 9 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ network-config:
gossipsub-rpc-sent-tracker-workers: 5
# Peer scoring is the default value for enabling peer scoring
gossipsub-peer-scoring-enabled: true
# The interval for updating the list of subscribed peers to all topics in gossipsub. This is used to keep track of subscriptions
# violations and penalize peers accordingly. Recommended value is in the order of a few minutes to avoid contentions; as the operation
# reads all topics and all peers subscribed to each topic.
gossipsub-subscription-provider-update-interval: 10m
# The size of cache for keeping the list of all peers subscribed to each topic (same as the local node). This cache is the local node's
# view of the network and is used to detect subscription violations and penalize peers accordingly. Recommended to be big enough to
# keep the entire network's size. Otherwise, the local node's view of the network will be incomplete due to cache eviction.
# Recommended size is 10x the number of peers in the network.
gossipsub-subscription-provider-cache-size: 10000

# Gossipsub rpc inspectors configs
# The size of the queue for notifications about invalid RPC messages
Expand Down
1 change: 1 addition & 0 deletions follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
&builder.FlowConfig.NetworkConfig.ResourceManager,
&builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubRPCInspectorsConfig,
p2pconfig.PeerManagerDisableConfig(), // disable peer manager for follower
&builder.FlowConfig.NetworkConfig.GossipSubConfig.SubscriptionProviderConfig,
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
Expand Down
15 changes: 15 additions & 0 deletions insecure/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,18 @@ endif
.PHONY: test
test:
go test $(if $(VERBOSE),-v,) -coverprofile=$(COVER_PROFILE) $(RACE_FLAG) $(if $(JSON_OUTPUT),-json,) $(if $(NUM_RUNS),-count $(NUM_RUNS),) --tags relic ./...

.PHONY: lint
lint: tidy
# revive -config revive.toml -exclude storage/ledger/trie ./...
golangci-lint run -v --build-tags relic ./...

# this ensures there is no unused dependency being added by accident
.PHONY: tidy
tidy:
go mod tidy -v
cd integration; go mod tidy -v
cd crypto; go mod tidy -v
cd cmd/testclient; go mod tidy -v
cd insecure; go mod tidy -v
git diff --exit-code

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions module/metrics/herocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func NetworkReceiveCacheMetricsFactory(f HeroCacheMetricsFactory, networkType ne
return f(namespaceNetwork, r)
}

func NewSubscriptionRecordCacheMetricsFactory(f HeroCacheMetricsFactory) module.HeroCacheMetrics {
return f(namespaceNetwork, ResourceNetworkingSubscriptionRecordsCache)
}

// DisallowListCacheMetricsFactory is the factory method for creating a new HeroCacheCollector for the disallow list cache.
// The disallow-list cache is used to keep track of peers that are disallow-listed and the reasons for it.
// Args:
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
ResourceEpochCommit = "epoch_commit"
ResourceEpochStatus = "epoch_status"
ResourceNetworkingReceiveCache = "networking_received_message" // networking layer
ResourceNetworkingSubscriptionRecordsCache = "subscription_records_cache" // networking layer
ResourceNetworkingDnsIpCache = "networking_dns_ip_cache" // networking layer
ResourceNetworkingDnsTxtCache = "networking_dns_txt_cache" // networking layer
ResourceNetworkingDisallowListNotificationQueue = "networking_disallow_list_notification_queue"
Expand Down
1 change: 0 additions & 1 deletion network/alsp/internal/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (s *SpamRecordCache) Adjust(originId flow.Identifier, adjustFunc model.Reco
penalty, err := s.adjust(originId, adjustFunc)

switch {

case err == ErrSpamRecordNotFound:
// if the record does not exist, we initialize the record and try to adjust it again.
// Note: there is an edge case where the record is initialized by another goroutine between the two calls.
Expand Down
1 change: 1 addition & 0 deletions network/internal/p2pfixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif
&defaultFlowConfig.NetworkConfig.ResourceManager,
&defaultFlowConfig.NetworkConfig.GossipSubRPCInspectorsConfig,
p2pconfig.PeerManagerDisableConfig(),
&defaultFlowConfig.NetworkConfig.GossipSubConfig.SubscriptionProviderConfig,
&p2p.DisallowListCacheConfig{
MaxSize: uint32(1000),
Metrics: metrics.NewNoopCollector(),
Expand Down
49 changes: 36 additions & 13 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const (
rpcSentTrackerNumOfWorkers = "gossipsub-rpc-sent-tracker-workers"
scoreTracerInterval = "gossipsub-score-tracer-interval"

gossipSubSubscriptionProviderUpdateInterval = "gossipsub-subscription-provider-update-interval"
gossipSubSubscriptionProviderCacheSize = "gossipsub-subscription-provider-cache-size"

// gossipsub validation inspector
gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size"
validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers"
Expand Down Expand Up @@ -171,9 +174,13 @@ func AllFlagNames() []string {
func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
flags.Bool(networkingConnectionPruning, config.NetworkConnectionPruning, "enabling connection trimming")
flags.Duration(dnsCacheTTL, config.DNSCacheTTL, "time-to-live for dns cache")
flags.StringSlice(preferredUnicastsProtocols, config.PreferredUnicastProtocols, "preferred unicast protocols in ascending order of preference")
flags.StringSlice(
preferredUnicastsProtocols, config.PreferredUnicastProtocols, "preferred unicast protocols in ascending order of preference")
flags.Uint32(receivedMessageCacheSize, config.NetworkReceivedMessageCacheSize, "incoming message cache size at networking layer")
flags.Uint32(disallowListNotificationCacheSize, config.DisallowListNotificationCacheSize, "cache size for notification events from disallow list")
flags.Uint32(
disallowListNotificationCacheSize,
config.DisallowListNotificationCacheSize,
"cache size for notification events from disallow list")
flags.Duration(peerUpdateInterval, config.PeerUpdateInterval, "how often to refresh the peer connections for the node")
flags.Duration(unicastMessageTimeout, config.UnicastMessageTimeout, "how long a unicast transmission can take to complete")
// unicast manager options
Expand Down Expand Up @@ -209,10 +216,22 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
flags.Duration(silencePeriod, config.ConnectionManagerConfig.SilencePeriod, "silence period for libp2p connection manager")
flags.Bool(peerScoring, config.GossipSubConfig.PeerScoring, "enabling peer scoring on pubsub network")
flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub")
flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable")
flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.")
flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.")
flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.")
flags.Duration(
scoreTracerInterval,
config.GossipSubConfig.ScoreTracerInterval,
"logging interval for peer score tracer in gossipsub, set to 0 to disable")
flags.Uint32(
rpcSentTrackerCacheSize,
config.GossipSubConfig.RPCSentTrackerCacheSize,
"cache size of the rpc sent tracker used by the gossipsub mesh tracer.")
flags.Uint32(
rpcSentTrackerQueueCacheSize,
config.GossipSubConfig.RPCSentTrackerQueueCacheSize,
"cache size of the rpc sent tracker worker queue.")
flags.Int(
rpcSentTrackerNumOfWorkers,
config.GossipSubConfig.RpcSentTrackerNumOfWorkers,
"number of workers for the rpc sent tracker worker pool.")
// gossipsub RPC control message validation limits used for validation configuration and rate limiting
flags.Int(validationInspectorNumberOfWorkers,
config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers,
Expand Down Expand Up @@ -280,12 +299,15 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.DuplicateMsgIDThreshold,
"max allowed duplicate message IDs in a single iWant control message")

flags.Int(rpcMessageMaxSampleSize,
config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.RpcMessageMaxSampleSize,
"the max sample size used for RPC message validation. If the total number of RPC messages exceeds this value a sample will be taken but messages will not be truncated")
flags.Int(rpcMessageErrorThreshold,
config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.RpcMessageErrorThreshold,
"the threshold at which an error will be returned if the number of invalid RPC messages exceeds this value")
flags.Int(rpcMessageMaxSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.RpcMessageMaxSampleSize, "the max sample size used for RPC message validation. If the total number of RPC messages exceeds this value a sample will be taken but messages will not be truncated")
flags.Int(rpcMessageErrorThreshold, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.RpcMessageErrorThreshold, "the threshold at which an error will be returned if the number of invalid RPC messages exceeds this value")
flags.Duration(
gossipSubSubscriptionProviderUpdateInterval, config.GossipSubConfig.SubscriptionProviderConfig.SubscriptionUpdateInterval,
"interval for updating the list of subscribed topics for all peers in the gossipsub, recommended value is a few minutes")
flags.Uint32(
gossipSubSubscriptionProviderCacheSize,
config.GossipSubConfig.SubscriptionProviderConfig.CacheSize,
"size of the cache that keeps the list of topics each peer has subscribed to, recommended size is 10x the number of authorized nodes")
}

// LoadLibP2PResourceManagerFlags loads all CLI flags for the libp2p resource manager configuration on the provided pflag set.
Expand Down Expand Up @@ -359,7 +381,8 @@ func SetAliases(conf *viper.Viper) error {
for _, flagName := range AllFlagNames() {
fullKey, ok := m[flagName]
if !ok {
return fmt.Errorf("invalid network configuration missing configuration key flag name %s check config file and cli flags", flagName)
return fmt.Errorf(
"invalid network configuration missing configuration key flag name %s check config file and cli flags", flagName)
}
conf.RegisterAlias(fullKey, flagName)
}
Expand Down
1 change: 1 addition & 0 deletions network/p2p/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type GossipSubRpcInspectorSuiteFactoryFunc func(
metrics.HeroCacheMetricsFactory,
flownet.NetworkingType,
module.IdentityProvider,
func() TopicProvider,
) (GossipSubInspectorSuite, error)

// NodeBuilder is a builder pattern for creating a libp2p Node instance.
Expand Down
7 changes: 0 additions & 7 deletions network/p2p/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,4 @@ type GossipSubInspectorSuite interface {
// pattern where the consumer is notified when a new notification is published.
// A consumer is only notified once for each notification, and only receives notifications that were published after it was added.
AddInvalidControlMessageConsumer(GossipSubInvCtrlMsgNotifConsumer)

// SetTopicOracle sets the topic oracle of the gossipsub inspector suite.
// The topic oracle is used to determine the list of topics that the node is subscribed to.
// If an oracle is not set, the node will not be able to determine the list of topics that the node is subscribed to.
// This func is expected to be called once and will return an error on all subsequent calls.
// All errors returned from this func are considered irrecoverable.
SetTopicOracle(topicOracle func() []string) error
}
35 changes: 35 additions & 0 deletions network/p2p/inspector/internal/mockTopicProvider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package internal

import (
"github.com/libp2p/go-libp2p/core/peer"
)

// MockUpdatableTopicProvider is a mock implementation of the TopicProvider interface.
// TODO: this should be moved to a common package (e.g. network/p2p/test). Currently, it is not possible to do so because of a circular dependency.
type MockUpdatableTopicProvider struct {
topics []string
subscriptions map[string][]peer.ID
}

func NewMockUpdatableTopicProvider() *MockUpdatableTopicProvider {
return &MockUpdatableTopicProvider{
topics: []string{},
subscriptions: map[string][]peer.ID{},
}
}

func (m *MockUpdatableTopicProvider) GetTopics() []string {
return m.topics
}

func (m *MockUpdatableTopicProvider) ListPeers(topic string) []peer.ID {
return m.subscriptions[topic]
}

func (m *MockUpdatableTopicProvider) UpdateTopics(topics []string) {
m.topics = topics
}

func (m *MockUpdatableTopicProvider) UpdateSubscriptions(topic string, peers []peer.ID) {
m.subscriptions[topic] = peers
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type ControlMsgValidationInspector struct {
// networkingType indicates public or private network, rpc publish messages are inspected for unstaked senders when running the private network.
networkingType network.NetworkingType
// topicOracle callback used to retrieve the current subscribed topics of the libp2p node.
topicOracle func() []string
topicOracle func() p2p.TopicProvider
}

type InspectorParams struct {
Expand All @@ -82,6 +82,9 @@ type InspectorParams struct {
RpcTracker p2p.RpcControlTracking `validate:"required"`
// NetworkingType the networking type of the node.
NetworkingType network.NetworkingType `validate:"required"`
// TopicOracle callback used to retrieve the current subscribed topics of the libp2p node.
// It is set as a callback to avoid circular dependencies between the topic oracle and the inspector.
TopicOracle func() p2p.TopicProvider `validate:"required"`
}

var _ component.Component = (*ControlMsgValidationInspector)(nil)
Expand All @@ -105,13 +108,18 @@ func NewControlMsgValidationInspector(params *InspectorParams) (*ControlMsgValid
inspectMsgQueueCacheCollector := metrics.GossipSubRPCInspectorQueueMetricFactory(params.HeroCacheMetricsFactory, params.NetworkingType)
clusterPrefixedCacheCollector := metrics.GossipSubRPCInspectorClusterPrefixedCacheMetricFactory(params.HeroCacheMetricsFactory, params.NetworkingType)

clusterPrefixedTracker, err := cache.NewClusterPrefixedMessagesReceivedTracker(params.Logger, params.Config.ClusterPrefixedControlMsgsReceivedCacheSize, clusterPrefixedCacheCollector, params.Config.ClusterPrefixedControlMsgsReceivedCacheDecay)
clusterPrefixedTracker, err := cache.NewClusterPrefixedMessagesReceivedTracker(params.Logger,
params.Config.ClusterPrefixedControlMsgsReceivedCacheSize,
clusterPrefixedCacheCollector,
params.Config.ClusterPrefixedControlMsgsReceivedCacheDecay)
if err != nil {
return nil, fmt.Errorf("failed to create cluster prefix topics received tracker")
}

if params.Config.RpcMessageMaxSampleSize < params.Config.RpcMessageErrorThreshold {
return nil, fmt.Errorf("rpc message max sample size must be greater than or equal to rpc message error threshold, got %d and %d respectively", params.Config.RpcMessageMaxSampleSize, params.Config.RpcMessageErrorThreshold)
return nil, fmt.Errorf("rpc message max sample size must be greater than or equal to rpc message error threshold, got %d and %d respectively",
params.Config.RpcMessageMaxSampleSize,
params.Config.RpcMessageErrorThreshold)
}

c := &ControlMsgValidationInspector{
Expand All @@ -124,6 +132,7 @@ func NewControlMsgValidationInspector(params *InspectorParams) (*ControlMsgValid
idProvider: params.IdProvider,
metrics: params.InspectorMetrics,
networkingType: params.NetworkingType,
topicOracle: params.TopicOracle,
}

store := queue.NewHeroStore(params.Config.CacheSize, params.Logger, inspectMsgQueueCacheCollector)
Expand Down Expand Up @@ -167,18 +176,6 @@ func (c *ControlMsgValidationInspector) ActiveClustersChanged(clusterIDList flow
c.tracker.StoreActiveClusterIds(clusterIDList)
}

// SetTopicOracle Sets the topic oracle. The topic oracle is used to determine the list of topics that the node is subscribed to.
// If an oracle is not set, the node will not be able to determine the list of topics that the node is subscribed to.
// This func is expected to be called once and will return an error on all subsequent calls.
// All errors returned from this func are considered irrecoverable.
func (c *ControlMsgValidationInspector) SetTopicOracle(topicOracle func() []string) error {
if c.topicOracle != nil {
return fmt.Errorf("topic oracle already set")
}
c.topicOracle = topicOracle
return nil
}

// Inspect is called by gossipsub upon reception of a rpc from a remote node.
// It creates a new InspectRPCRequest for the RPC to be inspected async by the worker pool.
// Args:
Expand Down Expand Up @@ -469,7 +466,7 @@ func (c *ControlMsgValidationInspector) inspectRpcPublishMessages(from peer.ID,
messages[i], messages[j] = messages[j], messages[i]
})

subscribedTopics := c.topicOracle()
subscribedTopics := c.topicOracle().GetTopics()
hasSubscription := func(topic string) bool {
for _, subscribedTopic := range subscribedTopics {
if topic == subscribedTopic {
Expand Down
Loading
Loading