Skip to content

Commit

Permalink
cdc,regionspan(ticdc): more logs about eventfeed retry rate limit (#4008
Browse files Browse the repository at this point in the history
)
  • Loading branch information
overvenus committed Dec 28, 2021
1 parent 5a6fc5c commit 14c9b1e
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

tableID, tableName := util.TableIDFromCtx(ctx)
cfID := util.ChangefeedIDFromCtx(ctx)
g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
Expand All @@ -546,7 +548,13 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
// rate limit triggers, add the error info to the rate limit queue
// rate limit triggers, add the error info to the rate limit queue.
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr))
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -993,15 +1001,11 @@ func (s *eventFeedSession) handleRateLimit(ctx context.Context) {
}

// checkRateLimit checks whether a region can be reconnected based on its rate limiter
func (s *eventFeedSession) checkRateLimit(regionID uint64) (allowed bool) {
func (s *eventFeedSession) checkRateLimit(regionID uint64) bool {
limiter := s.client.getRegionLimiter(regionID)
// use Limiter.Allow here since if exceed the rate limit, we skip this region
// and try it later.
allowed = limiter.Allow()
if !allowed {
log.Info("EventFeed retry rate limited", zap.Uint64("regionID", regionID))
}
return
return limiter.Allow()
}

// handleError handles error returned by a region. If some new EventFeed connection should be established, the region
Expand Down

0 comments on commit 14c9b1e

Please sign in to comment.