diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index ec47089405c..892deec65e4 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1296,12 +1296,17 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget if len(mergeMap) > 0 { continue } + log.Info("all the keyspace group primaries in the merge list are gone, "+ + "start to calculate the newly merged TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) // All the keyspace group primaries in the merge list are gone, - // update the newly merged TSO to make sure it is greater than the original ones. + // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id)) - if err != nil || ts == typeutil.ZeroTime { + if err != nil { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), @@ -1309,38 +1314,44 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget zap.Uint32("merge-id", id), zap.Time("ts", ts), zap.Error(err)) - mergedTS = typeutil.ZeroTime break } if ts.After(mergedTS) { mergedTS = ts } } - if mergedTS == typeutil.ZeroTime { - continue - } - // Update the newly merged TSO. - // TODO: support the Local TSO Allocator. - allocator, err := am.GetAllocator(GlobalDCLocation) if err != nil { - log.Error("failed to get the allocator", - zap.String("member", kgm.tsoServiceID.ServiceAddr), - zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), - zap.Error(err)) continue } - err = allocator.SetTSO( - tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), - true, true) - if err != nil { - log.Error("failed to update the newly merged TSO", + // Update the newly merged TSO if the merged TSO is not zero. + if mergedTS != typeutil.ZeroTime { + log.Info("start to set the newly merged TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), zap.Any("merge-list", mergeList), - zap.Time("merged-ts", mergedTS), - zap.Error(err)) - continue + zap.Time("merged-ts", mergedTS)) + // TODO: support the Local TSO Allocator. + allocator, err := am.GetAllocator(GlobalDCLocation) + if err != nil { + log.Error("failed to get the allocator", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + err = allocator.SetTSO( + tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), + true, true) + if err != nil { + log.Error("failed to update the newly merged TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Time("merged-ts", mergedTS), + zap.Error(err)) + continue + } } // Finish the merge. err = kgm.finishMergeKeyspaceGroup(mergeTargetID) diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 1ce039a762c..757f7df8d8d 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -170,7 +170,9 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { zap.Time("last", last), zap.Time("next", next), errs.ZapError(errs.ErrIncorrectSystemTime)) next = last.Add(UpdateTimestampGuard) } - + failpoint.Inject("failedToSaveTimestamp", func() { + failpoint.Return(errs.ErrEtcdTxnInternal) + }) save := next.Add(t.saveInterval) if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { tsoCounter.WithLabelValues("err_save_sync_ts", t.dcLocation).Inc() diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 0f7c916555d..3f00f85cacb 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -399,15 +399,33 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re.Equal(uint32(1), kg1.ID) re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Make sure the leader of the keyspace group 1 is elected. - member, err := suite.tsoCluster.WaitForPrimaryServing(re, 555, 1).GetMember(555, 1) + // Request the TSO for keyspace 555 concurrently via client. + cancel := suite.dispatchClient(re, 555, 1) + // Split the keyspace group 1 to 2. + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: 2, + Keyspaces: []uint32{555, 666}, + }) + // Wait for the keyspace groups to finish the split. + waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) + // Stop the client. + cancel() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( + re *require.Assertions, keyspaceID, keyspaceGroupID uint32, +) context.CancelFunc { + // Make sure the leader of the keyspace group is elected. + member, err := suite.tsoCluster. + WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID). + GetMember(keyspaceID, keyspaceGroupID) re.NoError(err) re.NotNil(member) - // Prepare the client for keyspace 555. - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 555, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + // Prepare the client for keyspace. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, keyspaceID, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) re.NoError(err) re.NotNil(tsoClient) - // Request the TSO for keyspace 555 concurrently. var ( wg sync.WaitGroup ctx, cancel = context.WithCancel(suite.ctx) @@ -431,7 +449,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() if strings.Contains(errMsg, "context canceled") || strings.Contains(errMsg, "not leader") || strings.Contains(errMsg, "not served") || - strings.Contains(errMsg, "ErrKeyspaceNotAssigned") { + strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || + strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { continue } re.FailNow(errMsg) @@ -444,17 +463,14 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() lastPhysical, lastLogical = physical, logical } }() - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, - Keyspaces: []uint32{555, 666}, - }) - // Wait for the keyspace groups to finish the split. - waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) - // Stop the client. - cancel() - wg.Wait() - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) + return func() { + // Wait for a while to make sure the client has sent more TSO requests. + time.Sleep(time.Second) + // Cancel the context to stop the client. + cancel() + // Wait for the client to stop. + wg.Wait() + } } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { @@ -622,68 +638,60 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() re.Equal(uint32(1), kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsMerging()) - // Make sure the leader of the keyspace group 1 is elected. - member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1) - re.NoError(err) - re.NotNil(member) - // Prepare the client for keyspace 222. - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) - re.NoError(err) - re.NotNil(tsoClient) - // Request the TSO for keyspace 222 concurrently. - var ( - wg sync.WaitGroup - ctx, cancel = context.WithCancel(suite.ctx) - lastPhysical, lastLogical int64 - ) - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - // Make sure at least one TSO request is successful. - re.NotEmpty(lastPhysical) - return - default: - } - physical, logical, err := tsoClient.GetTS(ctx) - if err != nil { - errMsg := err.Error() - // Ignore the errors caused by the merge and context cancellation. - if strings.Contains(errMsg, "context canceled") || - strings.Contains(errMsg, "not leader") || - strings.Contains(errMsg, "not served") || - strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || - strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { - continue - } - re.FailNow(errMsg) - } - if physical == lastPhysical { - re.Greater(logical, lastLogical) - } else { - re.Greater(physical, lastPhysical) - } - lastPhysical, lastLogical = physical, logical - } - }() + // Request the TSO for keyspace 222 concurrently via client. + cancel := suite.dispatchClient(re, 222, 1) // Merge the keyspace group 1 to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ MergeList: []uint32{1}, }) // Wait for the default keyspace group to finish the merge. + waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) + // Stop the client. + cancel() +} + +func waitFinishMerge( + re *require.Assertions, + server *tests.TestServer, + mergeTargetID uint32, + keyspaces []uint32, +) { testutil.Eventually(re, func() bool { - kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, mergeTargetID) re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) - for _, keyspaceID := range []uint32{111, 222, 333} { + for _, keyspaceID := range keyspaces { re.Contains(kg.Keyspaces, keyspaceID) } return !kg.IsMergeTarget() }) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() { + re := suite.Require() + // Make sure the TSO of keyspace group 1 won't be initialized before it's merged. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`)) + // Request the TSO for the default keyspace concurrently via client. + cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + // Merge the keyspace group 1 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1}, + }) + // Wait for the default keyspace group to finish the merge. + waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) // Stop the client. cancel() - wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp")) } // See https://github.com/tikv/pd/issues/6748