diff --git a/pkg/sink/kafka/metrics_collector.go b/pkg/sink/kafka/metrics_collector.go index bba092d909b..2e8af946288 100644 --- a/pkg/sink/kafka/metrics_collector.go +++ b/pkg/sink/kafka/metrics_collector.go @@ -30,8 +30,14 @@ type MetricsCollector interface { Run(ctx context.Context) } -// RefreshMetricsInterval specifies the interval of refresh kafka client metrics. -const RefreshMetricsInterval = 5 * time.Second +const ( + // RefreshMetricsInterval specifies the interval of refresh kafka client metrics. + RefreshMetricsInterval = 5 * time.Second + // refreshClusterMetaInterval specifies the interval of refresh kafka cluster meta. + // Do not set it too small, because it will cause too many requests to kafka cluster. + // Every request will get all topics and all brokers information. + refreshClusterMetaInterval = 30 * time.Minute +) // Sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview. const ( @@ -71,9 +77,14 @@ func NewSaramaMetricsCollector( } func (m *saramaMetricsCollector) Run(ctx context.Context) { - ticker := time.NewTicker(RefreshMetricsInterval) + // Initialize brokers. + m.updateBrokers(ctx) + + refreshMetricsTicker := time.NewTicker(RefreshMetricsInterval) + refreshClusterMetaTicker := time.NewTicker(refreshClusterMetaInterval) defer func() { - ticker.Stop() + refreshMetricsTicker.Stop() + refreshClusterMetaTicker.Stop() m.cleanupMetrics() }() @@ -81,10 +92,11 @@ func (m *saramaMetricsCollector) Run(ctx context.Context) { select { case <-ctx.Done(): return - case <-ticker.C: - m.updateBrokers(ctx) + case <-refreshMetricsTicker.C: m.collectBrokerMetrics() m.collectProducerMetrics() + case <-refreshClusterMetaTicker.C: + m.updateBrokers(ctx) } } }