From 00cf222f226dd90aad62fb547f44c11d78818f55 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 5 Jul 2023 18:05:51 +0800 Subject: [PATCH 1/4] Allow mergedTS to be zero in mergingChecker Signed-off-by: JmPotato --- pkg/tso/keyspace_group_manager.go | 55 ++++++++++++++++++------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index ec47089405c..892deec65e4 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1296,12 +1296,17 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget if len(mergeMap) > 0 { continue } + log.Info("all the keyspace group primaries in the merge list are gone, "+ + "start to calculate the newly merged TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) // All the keyspace group primaries in the merge list are gone, - // update the newly merged TSO to make sure it is greater than the original ones. + // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id)) - if err != nil || ts == typeutil.ZeroTime { + if err != nil { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), @@ -1309,38 +1314,44 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget zap.Uint32("merge-id", id), zap.Time("ts", ts), zap.Error(err)) - mergedTS = typeutil.ZeroTime break } if ts.After(mergedTS) { mergedTS = ts } } - if mergedTS == typeutil.ZeroTime { - continue - } - // Update the newly merged TSO. - // TODO: support the Local TSO Allocator. - allocator, err := am.GetAllocator(GlobalDCLocation) if err != nil { - log.Error("failed to get the allocator", - zap.String("member", kgm.tsoServiceID.ServiceAddr), - zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), - zap.Error(err)) continue } - err = allocator.SetTSO( - tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), - true, true) - if err != nil { - log.Error("failed to update the newly merged TSO", + // Update the newly merged TSO if the merged TSO is not zero. + if mergedTS != typeutil.ZeroTime { + log.Info("start to set the newly merged TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), zap.Uint32("merge-target-id", mergeTargetID), zap.Any("merge-list", mergeList), - zap.Time("merged-ts", mergedTS), - zap.Error(err)) - continue + zap.Time("merged-ts", mergedTS)) + // TODO: support the Local TSO Allocator. + allocator, err := am.GetAllocator(GlobalDCLocation) + if err != nil { + log.Error("failed to get the allocator", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + err = allocator.SetTSO( + tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), + true, true) + if err != nil { + log.Error("failed to update the newly merged TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Time("merged-ts", mergedTS), + zap.Error(err)) + continue + } } // Finish the merge. err = kgm.finishMergeKeyspaceGroup(mergeTargetID) From 21e07e18276e49133a520dd5df9996afef821b86 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 6 Jul 2023 11:53:13 +0800 Subject: [PATCH 2/4] Add a test Signed-off-by: JmPotato --- pkg/tso/keyspace_group_manager.go | 7 + .../mcs/tso/keyspace_group_manager_test.go | 134 +++++++++--------- 2 files changed, 76 insertions(+), 65 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 892deec65e4..8b125379040 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -738,6 +738,13 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro return splitSourceAM.GetMember().IsLeader() }) } + + failpoint.Inject("electionCampaignFailed", func() { + participant.SetCampaignChecker(func(leadership *election.Leadership) bool { + return false + }) + }) + // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. var ( tsRootPath string diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 0f7c916555d..43650b77dcd 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -399,15 +399,34 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re.Equal(uint32(1), kg1.ID) re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Make sure the leader of the keyspace group 1 is elected. - member, err := suite.tsoCluster.WaitForPrimaryServing(re, 555, 1).GetMember(555, 1) + // Request the TSO for keyspace 555 concurrently via client. + wg, cancel := suite.dispatchClient(re, 555, 1) + // Split the keyspace group 1 to 2. + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: 2, + Keyspaces: []uint32{555, 666}, + }) + // Wait for the keyspace groups to finish the split. + waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) + // Stop the client. + cancel() + wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( + re *require.Assertions, keyspaceID, keyspaceGroupID uint32, +) (*sync.WaitGroup, context.CancelFunc) { + // Make sure the leader of the keyspace group is elected. + member, err := suite.tsoCluster. + WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID). + GetMember(keyspaceID, keyspaceGroupID) re.NoError(err) re.NotNil(member) - // Prepare the client for keyspace 555. - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 555, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + // Prepare the client for keyspace. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, keyspaceID, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) re.NoError(err) re.NotNil(tsoClient) - // Request the TSO for keyspace 555 concurrently. var ( wg sync.WaitGroup ctx, cancel = context.WithCancel(suite.ctx) @@ -431,7 +450,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() if strings.Contains(errMsg, "context canceled") || strings.Contains(errMsg, "not leader") || strings.Contains(errMsg, "not served") || - strings.Contains(errMsg, "ErrKeyspaceNotAssigned") { + strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || + strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { continue } re.FailNow(errMsg) @@ -444,17 +464,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() lastPhysical, lastLogical = physical, logical } }() - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, - Keyspaces: []uint32{555, 666}, - }) - // Wait for the keyspace groups to finish the split. - waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) - // Stop the client. - cancel() - wg.Wait() - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) + return &wg, cancel } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { @@ -622,68 +632,62 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() re.Equal(uint32(1), kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsMerging()) - // Make sure the leader of the keyspace group 1 is elected. - member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1) - re.NoError(err) - re.NotNil(member) - // Prepare the client for keyspace 222. - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) - re.NoError(err) - re.NotNil(tsoClient) - // Request the TSO for keyspace 222 concurrently. - var ( - wg sync.WaitGroup - ctx, cancel = context.WithCancel(suite.ctx) - lastPhysical, lastLogical int64 - ) - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - // Make sure at least one TSO request is successful. - re.NotEmpty(lastPhysical) - return - default: - } - physical, logical, err := tsoClient.GetTS(ctx) - if err != nil { - errMsg := err.Error() - // Ignore the errors caused by the merge and context cancellation. - if strings.Contains(errMsg, "context canceled") || - strings.Contains(errMsg, "not leader") || - strings.Contains(errMsg, "not served") || - strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || - strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { - continue - } - re.FailNow(errMsg) - } - if physical == lastPhysical { - re.Greater(logical, lastLogical) - } else { - re.Greater(physical, lastPhysical) - } - lastPhysical, lastLogical = physical, logical - } - }() + // Request the TSO for keyspace 222 concurrently via client. + wg, cancel := suite.dispatchClient(re, 222, 1) // Merge the keyspace group 1 to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ MergeList: []uint32{1}, }) // Wait for the default keyspace group to finish the merge. + waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) + // Stop the client. + cancel() + wg.Wait() +} + +func waitFinishMerge( + re *require.Assertions, + server *tests.TestServer, + mergeTargetID uint32, + keyspaces []uint32, +) { testutil.Eventually(re, func() bool { - kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, mergeTargetID) re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) - for _, keyspaceID := range []uint32{111, 222, 333} { + for _, keyspaceID := range keyspaces { re.Contains(kg.Keyspaces, keyspaceID) } return !kg.IsMergeTarget() }) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() { + re := suite.Require() + // Make sure the TSO of keyspace group 1 won't be initialized. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/electionCampaignFailed", `return(true)`)) + // Request the TSO for the default keyspace concurrently via client. + wg, cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + // Merge the keyspace group 1 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1}, + }) + // Wait for the default keyspace group to finish the merge. + waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) // Stop the client. cancel() wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/electionCampaignFailed")) } // See https://github.com/tikv/pd/issues/6748 From 97b3f1cd9561db988da6aa26ef62e9cd697a0a9b Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 6 Jul 2023 12:00:45 +0800 Subject: [PATCH 3/4] Refine the failpoint Signed-off-by: JmPotato --- pkg/tso/keyspace_group_manager.go | 7 ------- pkg/tso/tso.go | 4 +++- tests/integrations/mcs/tso/keyspace_group_manager_test.go | 6 +++--- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 8b125379040..892deec65e4 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -738,13 +738,6 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro return splitSourceAM.GetMember().IsLeader() }) } - - failpoint.Inject("electionCampaignFailed", func() { - participant.SetCampaignChecker(func(leadership *election.Leadership) bool { - return false - }) - }) - // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. var ( tsRootPath string diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 1ce039a762c..757f7df8d8d 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -170,7 +170,9 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { zap.Time("last", last), zap.Time("next", next), errs.ZapError(errs.ErrIncorrectSystemTime)) next = last.Add(UpdateTimestampGuard) } - + failpoint.Inject("failedToSaveTimestamp", func() { + failpoint.Return(errs.ErrEtcdTxnInternal) + }) save := next.Add(t.saveInterval) if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { tsoCounter.WithLabelValues("err_save_sync_ts", t.dcLocation).Inc() diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 43650b77dcd..66de282c669 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -663,8 +663,8 @@ func waitFinishMerge( func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() { re := suite.Require() - // Make sure the TSO of keyspace group 1 won't be initialized. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/electionCampaignFailed", `return(true)`)) + // Make sure the TSO of keyspace group 1 won't be initialized before it's merged. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`)) // Request the TSO for the default keyspace concurrently via client. wg, cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) // Create the keyspace group 1 with keyspaces [111, 222, 333]. @@ -687,7 +687,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeIn // Stop the client. cancel() wg.Wait() - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/electionCampaignFailed")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp")) } // See https://github.com/tikv/pd/issues/6748 From b2a1924b3c8837901a6bc28e09cdf615a1cbbe65 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 6 Jul 2023 12:38:00 +0800 Subject: [PATCH 4/4] Refine the code Signed-off-by: JmPotato --- .../mcs/tso/keyspace_group_manager_test.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 66de282c669..3f00f85cacb 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -400,7 +400,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) // Request the TSO for keyspace 555 concurrently via client. - wg, cancel := suite.dispatchClient(re, 555, 1) + cancel := suite.dispatchClient(re, 555, 1) // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ NewID: 2, @@ -410,13 +410,12 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) // Stop the client. cancel() - wg.Wait() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) } func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( re *require.Assertions, keyspaceID, keyspaceGroupID uint32, -) (*sync.WaitGroup, context.CancelFunc) { +) context.CancelFunc { // Make sure the leader of the keyspace group is elected. member, err := suite.tsoCluster. WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID). @@ -464,7 +463,14 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( lastPhysical, lastLogical = physical, logical } }() - return &wg, cancel + return func() { + // Wait for a while to make sure the client has sent more TSO requests. + time.Sleep(time.Second) + // Cancel the context to stop the client. + cancel() + // Wait for the client to stop. + wg.Wait() + } } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { @@ -633,7 +639,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsMerging()) // Request the TSO for keyspace 222 concurrently via client. - wg, cancel := suite.dispatchClient(re, 222, 1) + cancel := suite.dispatchClient(re, 222, 1) // Merge the keyspace group 1 to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ MergeList: []uint32{1}, @@ -642,7 +648,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) // Stop the client. cancel() - wg.Wait() } func waitFinishMerge( @@ -666,7 +671,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeIn // Make sure the TSO of keyspace group 1 won't be initialized before it's merged. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`)) // Request the TSO for the default keyspace concurrently via client. - wg, cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) // Create the keyspace group 1 with keyspaces [111, 222, 333]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ @@ -686,7 +691,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeIn waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) // Stop the client. cancel() - wg.Wait() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp")) }