diff --git a/client/client.go b/client/client.go index 006e303c1cd..5a4728308f0 100644 --- a/client/client.go +++ b/client/client.go @@ -265,6 +265,14 @@ func WithInitMetricsOption(initMetrics bool) ClientOption { } } +// WithAllowTSOFallback configures the client with `allowTSOFallback` option. +// NOTICE: This should only be used for testing. +func WithAllowTSOFallback() ClientOption { + return func(c *client) { + c.option.allowTSOFallback = true + } +} + var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. diff --git a/client/option.go b/client/option.go index d6a6d61d2f9..9d46c7b1a70 100644 --- a/client/option.go +++ b/client/option.go @@ -54,6 +54,7 @@ type option struct { enableForwarding bool metricsLabels prometheus.Labels initMetrics bool + allowTSOFallback bool // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 9fd5c586bf9..c93c281ec04 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -787,7 +787,22 @@ func (c *tsoClient) compareAndSwapTS( // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned // last time. if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { - log.Panic("[tso] timestamp fallback", + if !c.option.allowTSOFallback { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + log.Error("[tso] timestamp fallback", zap.String("dc-location", dcLocation), zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), @@ -799,8 +814,7 @@ func (c *tsoClient) compareAndSwapTS( zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), - ) + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) } lastTSOInfo.tsoServer = curTSOInfo.tsoServer lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a1c2ec08565..6727877a1c7 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -436,7 +436,7 @@ func TestMixedTSODeployment(t *testing.T) { ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup - checkTSO(ctx1, re, &wg, backendEndpoints) + checkTSO(ctx1, re, &wg, backendEndpoints, pd.WithAllowTSOFallback() /* It's expected that the timestamp fallback happens here */) wg.Add(1) go func() { defer wg.Done() @@ -497,12 +497,15 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode")) } -func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) { +func checkTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + backendEndpoints string, opts ...pd.ClientOption, +) { wg.Add(tsoRequestConcurrencyNumber) for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","), opts...) defer cli.Close() var ts, lastTS uint64 for {