diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 340a12e5a2a..0cc583e2ef0 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -505,6 +505,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { } }) + tableID, tableName := util.TableIDFromCtx(ctx) + cfID := util.ChangefeedIDFromCtx(ctx) g.Go(func() error { checkRateLimitInterval := 50 * time.Millisecond timer := time.NewTimer(checkRateLimitInterval) @@ -520,7 +522,13 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { s.errChSizeGauge.Dec() allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID()) if !allowed { - // rate limit triggers, add the error info to the rate limit queue + // rate limit triggers, add the error info to the rate limit queue. + log.Info("EventFeed retry rate limited", + zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()), + zap.Uint64("ts", errInfo.singleRegionInfo.ts), + zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span), + zap.Int64("tableID", tableID), zap.String("tableName", tableName), + zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)) s.rateLimitQueue = append(s.rateLimitQueue, errInfo) } else { err := s.handleError(ctx, errInfo) @@ -967,15 +975,11 @@ func (s *eventFeedSession) handleRateLimit(ctx context.Context) { } // checkRateLimit checks whether a region can be reconnected based on its rate limiter -func (s *eventFeedSession) checkRateLimit(regionID uint64) (allowed bool) { +func (s *eventFeedSession) checkRateLimit(regionID uint64) bool { limiter := s.client.getRegionLimiter(regionID) // use Limiter.Allow here since if exceed the rate limit, we skip this region // and try it later. - allowed = limiter.Allow() - if !allowed { - log.Info("EventFeed retry rate limited", zap.Uint64("regionID", regionID)) - } - return + return limiter.Allow() } // handleError handles error returned by a region. If some new EventFeed connection should be established, the region