Skip to content

Commit

Permalink
pkg/sink(ticdc): Update Kafka cluster every 30min (#9287) (#9292)
Browse files Browse the repository at this point in the history
close #8959
  • Loading branch information
ti-chi-bot authored Jun 30, 2023
1 parent 1677113 commit 2b80a47
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 13 deletions.
25 changes: 18 additions & 7 deletions cdc/sink/mq/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ func (sm *saramaMetricsMonitor) collectBrokers() {
}

func (sm *saramaMetricsMonitor) collectBrokerMetrics() {
sm.collectBrokers()

for id := range sm.brokers {
brokerID := strconv.Itoa(int(id))

Expand Down Expand Up @@ -309,8 +307,14 @@ func (sm *saramaMetricsMonitor) collectBrokerMetrics() {
}
}

// flushMetricsInterval specifies the interval of refresh sarama metrics.
const flushMetricsInterval = 5 * time.Second
const (
// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
// refreshClusterMetaInterval specifies the interval of refresh kafka cluster meta.
// Do not set it too small, because it will cause too many requests to kafka cluster.
// Every request will get all topics and all brokers information.
refreshClusterMetaInterval = 30 * time.Minute
)

func runSaramaMetricsMonitor(ctx context.Context,
registry metrics.Registry,
Expand All @@ -325,18 +329,25 @@ func runSaramaMetricsMonitor(ctx context.Context,
brokers: make(map[int32]struct{}),
}

ticker := time.NewTicker(flushMetricsInterval)
// Initialize brokers.
monitor.collectBrokers()

refreshMetricsTicker := time.NewTicker(flushMetricsInterval)
refreshClusterMetaTicker := time.NewTicker(refreshClusterMetaInterval)
go func() {
defer func() {
ticker.Stop()
refreshMetricsTicker.Stop()
refreshClusterMetaTicker.Stop()
monitor.cleanup()
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-refreshMetricsTicker.C:
monitor.collectMetrics()
case <-refreshClusterMetaTicker.C:
monitor.collectBrokers()
}
}
}()
Expand Down
24 changes: 18 additions & 6 deletions cdc/sinkv2/metrics/mq/kafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ import (
"go.uber.org/zap"
)

// flushMetricsInterval specifies the interval of refresh sarama metrics.
const flushMetricsInterval = 5 * time.Second
const (
// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
// refreshClusterMetaInterval specifies the interval of refresh kafka cluster meta.
// Do not set it too small, because it will cause too many requests to kafka cluster.
// Every request will get all topics and all brokers information.
refreshClusterMetaInterval = 30 * time.Minute
)

// Sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview.
const (
Expand Down Expand Up @@ -71,20 +77,26 @@ func New(
// Run collects kafka metrics.
// It will close the admin client when it's done.
func (m *Collector) Run(ctx context.Context) {
ticker := time.NewTicker(flushMetricsInterval)
// Initialize brokers.
m.updateBrokers()

refreshMetricsTicker := time.NewTicker(flushMetricsInterval)
refreshClusterMetaTicker := time.NewTicker(refreshClusterMetaInterval)
defer func() {
ticker.Stop()
refreshMetricsTicker.Stop()
refreshClusterMetaTicker.Stop()
m.cleanupMetrics()
}()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.updateBrokers()
case <-refreshMetricsTicker.C:
m.collectBrokerMetrics()
m.collectProducerMetrics()
case <-refreshClusterMetaTicker.C:
m.updateBrokers()
}
}
}
Expand Down

0 comments on commit 2b80a47

Please sign in to comment.