From 0ffbea9276bc02c250488c1ba07d793c7f5c122c Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 21 Jun 2023 16:55:26 +0800 Subject: [PATCH 1/4] pkg/sink(ticdc): Update Kafka cluster every 30min Signed-off-by: hi-rustin --- pkg/sink/kafka/metrics_collector.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/sink/kafka/metrics_collector.go b/pkg/sink/kafka/metrics_collector.go index 8149369765b..ea83b69c1fb 100644 --- a/pkg/sink/kafka/metrics_collector.go +++ b/pkg/sink/kafka/metrics_collector.go @@ -30,8 +30,12 @@ type MetricsCollector interface { Run(ctx context.Context) } -// RefreshMetricsInterval specifies the interval of refresh kafka client metrics. -const RefreshMetricsInterval = 5 * time.Second +const ( + // RefreshMetricsInterval specifies the interval of refresh kafka client metrics. + RefreshMetricsInterval = 5 * time.Second + // RefreshClusterMetaInterval specifies the interval of refresh kafka cluster meta. + RefreshClusterMetaInterval = 30 * time.Minute +) // Sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview. const ( @@ -71,9 +75,13 @@ func NewSaramaMetricsCollector( } func (m *saramaMetricsCollector) Run(ctx context.Context) { - ticker := time.NewTicker(RefreshMetricsInterval) + // Initialize brokers. + m.updateBrokers(ctx) + + refreshMetricsTicker := time.NewTicker(RefreshMetricsInterval) + refreshClusterMetaTicker := time.NewTicker(RefreshClusterMetaInterval) defer func() { - ticker.Stop() + refreshMetricsTicker.Stop() m.cleanupMetrics() }() @@ -84,10 +92,11 @@ func (m *saramaMetricsCollector) Run(ctx context.Context) { zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID)) return - case <-ticker.C: - m.updateBrokers(ctx) + case <-refreshMetricsTicker.C: m.collectBrokerMetrics() m.collectProducerMetrics() + case <-refreshClusterMetaTicker.C: + m.updateBrokers(ctx) } } } From 61a8a8077a7f4c08333333ac5dec49635d43e6d8 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 21 Jun 2023 17:01:56 +0800 Subject: [PATCH 2/4] pkg/sink(ticdc): narrow scope Signed-off-by: hi-rustin --- pkg/sink/kafka/metrics_collector.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sink/kafka/metrics_collector.go b/pkg/sink/kafka/metrics_collector.go index ea83b69c1fb..215b2aade07 100644 --- a/pkg/sink/kafka/metrics_collector.go +++ b/pkg/sink/kafka/metrics_collector.go @@ -33,8 +33,8 @@ type MetricsCollector interface { const ( // RefreshMetricsInterval specifies the interval of refresh kafka client metrics. RefreshMetricsInterval = 5 * time.Second - // RefreshClusterMetaInterval specifies the interval of refresh kafka cluster meta. - RefreshClusterMetaInterval = 30 * time.Minute + // refreshClusterMetaInterval specifies the interval of refresh kafka cluster meta. + refreshClusterMetaInterval = 30 * time.Minute ) // Sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview. @@ -79,7 +79,7 @@ func (m *saramaMetricsCollector) Run(ctx context.Context) { m.updateBrokers(ctx) refreshMetricsTicker := time.NewTicker(RefreshMetricsInterval) - refreshClusterMetaTicker := time.NewTicker(RefreshClusterMetaInterval) + refreshClusterMetaTicker := time.NewTicker(refreshClusterMetaInterval) defer func() { refreshMetricsTicker.Stop() m.cleanupMetrics() From aeee74a272fe4716e19364eca00d3b2d2d0bf0ad Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 21 Jun 2023 17:03:46 +0800 Subject: [PATCH 3/4] pkg/sink(ticdc): better comment Signed-off-by: hi-rustin --- pkg/sink/kafka/metrics_collector.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/sink/kafka/metrics_collector.go b/pkg/sink/kafka/metrics_collector.go index 215b2aade07..854f7a85790 100644 --- a/pkg/sink/kafka/metrics_collector.go +++ b/pkg/sink/kafka/metrics_collector.go @@ -34,6 +34,8 @@ const ( // RefreshMetricsInterval specifies the interval of refresh kafka client metrics. RefreshMetricsInterval = 5 * time.Second // refreshClusterMetaInterval specifies the interval of refresh kafka cluster meta. + // Do not set it too small, because it will cause too many requests to kafka cluster. + // Every request will get all topics and all brokers information. refreshClusterMetaInterval = 30 * time.Minute ) From 5485ddf8bde793a52f76bb23733fa9bf349d752e Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 21 Jun 2023 17:25:31 +0800 Subject: [PATCH 4/4] pkg/sink(ticdc): stop ticker Signed-off-by: hi-rustin --- pkg/sink/kafka/metrics_collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sink/kafka/metrics_collector.go b/pkg/sink/kafka/metrics_collector.go index 854f7a85790..961a0d973dc 100644 --- a/pkg/sink/kafka/metrics_collector.go +++ b/pkg/sink/kafka/metrics_collector.go @@ -84,6 +84,7 @@ func (m *saramaMetricsCollector) Run(ctx context.Context) { refreshClusterMetaTicker := time.NewTicker(refreshClusterMetaInterval) defer func() { refreshMetricsTicker.Stop() + refreshClusterMetaTicker.Stop() m.cleanupMetrics() }()