Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: allow mergedTS to be zero in mergingChecker #6758

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,51 +1296,62 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
if len(mergeMap) > 0 {
continue
}
log.Info("all the keyspace group primaries in the merge list are gone, "+
"start to calculate the newly merged TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
// All the keyspace group primaries in the merge list are gone,
// update the newly merged TSO to make sure it is greater than the original ones.
// calculate the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id))
if err != nil || ts == typeutil.ZeroTime {
if err != nil {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Uint32("merge-id", id),
zap.Time("ts", ts),
zap.Error(err))
mergedTS = typeutil.ZeroTime
break
}
if ts.After(mergedTS) {
mergedTS = ts
}
}
if mergedTS == typeutil.ZeroTime {
continue
}
// Update the newly merged TSO.
// TODO: support the Local TSO Allocator.
allocator, err := am.GetAllocator(GlobalDCLocation)
if err != nil {
log.Error("failed to get the allocator",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
err = allocator.SetTSO(
tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)),
true, true)
if err != nil {
log.Error("failed to update the newly merged TSO",
// Update the newly merged TSO if the merged TSO is not zero.
if mergedTS != typeutil.ZeroTime {
log.Info("start to set the newly merged TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Time("merged-ts", mergedTS),
zap.Error(err))
continue
zap.Time("merged-ts", mergedTS))
// TODO: support the Local TSO Allocator.
allocator, err := am.GetAllocator(GlobalDCLocation)
if err != nil {
log.Error("failed to get the allocator",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
err = allocator.SetTSO(
tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)),
true, true)
if err != nil {
log.Error("failed to update the newly merged TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Time("merged-ts", mergedTS),
zap.Error(err))
continue
}
}
// Finish the merge.
err = kgm.finishMergeKeyspaceGroup(mergeTargetID)
Expand Down
4 changes: 3 additions & 1 deletion pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
zap.Time("last", last), zap.Time("next", next), errs.ZapError(errs.ErrIncorrectSystemTime))
next = last.Add(UpdateTimestampGuard)
}

failpoint.Inject("failedToSaveTimestamp", func() {
failpoint.Return(errs.ErrEtcdTxnInternal)
})
save := next.Add(t.saveInterval)
if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
tsoCounter.WithLabelValues("err_save_sync_ts", t.dcLocation).Inc()
Expand Down
140 changes: 74 additions & 66 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,33 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Make sure the leader of the keyspace group 1 is elected.
member, err := suite.tsoCluster.WaitForPrimaryServing(re, 555, 1).GetMember(555, 1)
// Request the TSO for keyspace 555 concurrently via client.
cancel := suite.dispatchClient(re, 555, 1)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
Keyspaces: []uint32{555, 666},
})
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666})
// Stop the client.
cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow"))
}

func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient(
re *require.Assertions, keyspaceID, keyspaceGroupID uint32,
) context.CancelFunc {
// Make sure the leader of the keyspace group is elected.
member, err := suite.tsoCluster.
WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID).
GetMember(keyspaceID, keyspaceGroupID)
re.NoError(err)
re.NotNil(member)
// Prepare the client for keyspace 555.
tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 555, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{})
// Prepare the client for keyspace.
tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, keyspaceID, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{})
re.NoError(err)
re.NotNil(tsoClient)
// Request the TSO for keyspace 555 concurrently.
var (
wg sync.WaitGroup
ctx, cancel = context.WithCancel(suite.ctx)
Expand All @@ -431,7 +449,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
if strings.Contains(errMsg, "context canceled") ||
strings.Contains(errMsg, "not leader") ||
strings.Contains(errMsg, "not served") ||
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") {
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") ||
strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") {
continue
}
re.FailNow(errMsg)
Expand All @@ -444,17 +463,14 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
lastPhysical, lastLogical = physical, logical
}
}()
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
Keyspaces: []uint32{555, 666},
})
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666})
// Stop the client.
cancel()
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow"))
return func() {
// Wait for a while to make sure the client has sent more TSO requests.
time.Sleep(time.Second)
// Cancel the context to stop the client.
cancel()
// Wait for the client to stop.
wg.Wait()
}
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
Expand Down Expand Up @@ -622,68 +638,60 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient()
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsMerging())
// Make sure the leader of the keyspace group 1 is elected.
member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1)
re.NoError(err)
re.NotNil(member)
// Prepare the client for keyspace 222.
tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{})
re.NoError(err)
re.NotNil(tsoClient)
// Request the TSO for keyspace 222 concurrently.
var (
wg sync.WaitGroup
ctx, cancel = context.WithCancel(suite.ctx)
lastPhysical, lastLogical int64
)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
// Make sure at least one TSO request is successful.
re.NotEmpty(lastPhysical)
return
default:
}
physical, logical, err := tsoClient.GetTS(ctx)
if err != nil {
errMsg := err.Error()
// Ignore the errors caused by the merge and context cancellation.
if strings.Contains(errMsg, "context canceled") ||
strings.Contains(errMsg, "not leader") ||
strings.Contains(errMsg, "not served") ||
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") ||
strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") {
continue
}
re.FailNow(errMsg)
}
if physical == lastPhysical {
re.Greater(logical, lastLogical)
} else {
re.Greater(physical, lastPhysical)
}
lastPhysical, lastLogical = physical, logical
}
}()
// Request the TSO for keyspace 222 concurrently via client.
cancel := suite.dispatchClient(re, 222, 1)
// Merge the keyspace group 1 to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1},
})
// Wait for the default keyspace group to finish the merge.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333})
// Stop the client.
cancel()
}

func waitFinishMerge(
re *require.Assertions,
server *tests.TestServer,
mergeTargetID uint32,
keyspaces []uint32,
) {
testutil.Eventually(re, func() bool {
kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, mergeTargetID)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID)
for _, keyspaceID := range []uint32{111, 222, 333} {
for _, keyspaceID := range keyspaces {
re.Contains(kg.Keyspaces, keyspaceID)
}
return !kg.IsMergeTarget()
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() {
re := suite.Require()
// Make sure the TSO of keyspace group 1 won't be initialized before it's merged.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`))
// Request the TSO for the default keyspace concurrently via client.
cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// Create the keyspace group 1 with keyspaces [111, 222, 333].
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
// Merge the keyspace group 1 to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1},
})
// Wait for the default keyspace group to finish the merge.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333})
// Stop the client.
cancel()
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp"))
}

// See https://github.com/tikv/pd/issues/6748
Expand Down