Skip to content

Commit

Permalink
pkg/sink(ticdc): Update Kafka cluster every 30min (#9287) (#9293)
Browse files Browse the repository at this point in the history
close #8959
  • Loading branch information
ti-chi-bot authored Jun 27, 2023
1 parent ebabf06 commit 631ef8c
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions pkg/sink/kafka/metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ type MetricsCollector interface {
Run(ctx context.Context)
}

// RefreshMetricsInterval specifies the interval of refresh kafka client metrics.
const RefreshMetricsInterval = 5 * time.Second
const (
// RefreshMetricsInterval specifies the interval of refresh kafka client metrics.
RefreshMetricsInterval = 5 * time.Second
// refreshClusterMetaInterval specifies the interval of refresh kafka cluster meta.
// Do not set it too small, because it will cause too many requests to kafka cluster.
// Every request will get all topics and all brokers information.
refreshClusterMetaInterval = 30 * time.Minute
)

// Sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview.
const (
Expand Down Expand Up @@ -71,20 +77,26 @@ func NewSaramaMetricsCollector(
}

func (m *saramaMetricsCollector) Run(ctx context.Context) {
ticker := time.NewTicker(RefreshMetricsInterval)
// Initialize brokers.
m.updateBrokers(ctx)

refreshMetricsTicker := time.NewTicker(RefreshMetricsInterval)
refreshClusterMetaTicker := time.NewTicker(refreshClusterMetaInterval)
defer func() {
ticker.Stop()
refreshMetricsTicker.Stop()
refreshClusterMetaTicker.Stop()
m.cleanupMetrics()
}()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.updateBrokers(ctx)
case <-refreshMetricsTicker.C:
m.collectBrokerMetrics()
m.collectProducerMetrics()
case <-refreshClusterMetaTicker.C:
m.updateBrokers(ctx)
}
}
}
Expand Down

0 comments on commit 631ef8c

Please sign in to comment.