diff --git a/errors.toml b/errors.toml index b6199698ec5..b6f6621957f 100644 --- a/errors.toml +++ b/errors.toml @@ -1,21 +1,6 @@ # AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen # YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. -["ErrLoadKeyspaceGroupsRetryExhausted"] -error = ''' -load keyspace groups retry exhausted, %s -''' - -["ErrLoadKeyspaceGroupsTerminated"] -error = ''' -load keyspace groups terminated -''' - -["ErrLoadKeyspaceGroupsTimeout"] -error = ''' -load keyspace groups timeout -''' - ["PD:ErrEncryptionKMS"] error = ''' KMS error @@ -506,6 +491,11 @@ error = ''' marshal leader failed ''' +["PD:member:ErrPreCheckCampaign"] +error = ''' +pre-check campaign failed +''' + ["PD:netstat:ErrNetstatTCPSocks"] error = ''' TCP socks error @@ -761,11 +751,31 @@ error = ''' the keyspace group id is invalid, %s ''' +["PD:tso:ErrKeyspaceGroupNotInitialized"] +error = ''' +the keyspace group %d isn't initialized +''' + ["PD:tso:ErrKeyspaceNotAssigned"] error = ''' the keyspace %d isn't assigned to any keyspace group ''' +["PD:tso:ErrLoadKeyspaceGroupsRetryExhausted"] +error = ''' +load keyspace groups retry exhausted, %s +''' + +["PD:tso:ErrLoadKeyspaceGroupsTerminated"] +error = ''' +load keyspace groups terminated +''' + +["PD:tso:ErrLoadKeyspaceGroupsTimeout"] +error = ''' +load keyspace groups timeout +''' + ["PD:tso:ErrLogicOverflow"] error = ''' logic part overflow diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 3a756e8000d..eb22d7584b2 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -49,9 +49,10 @@ var ( ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid")) ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager")) - ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) - ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) - ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhausted")) + ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsTimeout")) + ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsTerminated")) + ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsRetryExhausted")) + ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized")) ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) ) @@ -59,6 +60,7 @@ var ( var ( ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound")) ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader")) + ErrPreCheckCampaign = errors.Normalize("pre-check campaign failed", errors.RFCCodeText("PD:member:ErrPreCheckCampaign")) ) // core errors diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index ed0b46cbe94..82a606ca205 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -147,7 +147,7 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro if kg == nil { return nil } - if kg.InSplit { + if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } return m.store.DeleteKeyspaceGroup(txn, id) @@ -176,17 +176,24 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro if oldKG != nil && !overwrite { return ErrKeyspaceGroupExists } - if oldKG != nil && oldKG.InSplit && overwrite { + if oldKG.IsSplitting() && overwrite { return ErrKeyspaceGroupInSplit } - m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{ + newKG := &endpoint.KeyspaceGroup{ ID: keyspaceGroup.ID, UserKind: keyspaceGroup.UserKind, Members: keyspaceGroup.Members, Keyspaces: keyspaceGroup.Keyspaces, - InSplit: keyspaceGroup.InSplit, - SplitFrom: keyspaceGroup.SplitFrom, - }) + } + if oldKG.IsSplitting() { + newKG.SplitState = &endpoint.SplitState{ + SplitSource: oldKG.SplitState.SplitSource, + } + } + err = m.store.SaveKeyspaceGroup(txn, newKG) + if err != nil { + return err + } } return nil }) @@ -230,7 +237,7 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI if kg == nil { return errors.Errorf("keyspace group %d not found", id) } - if kg.InSplit { + if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } switch mutation { @@ -276,7 +283,7 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse if newKG == nil { return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind) } - if oldKG.InSplit || newKG.InSplit { + if oldKG.IsSplitting() || newKG.IsSplitting() { return ErrKeyspaceGroupInSplit } @@ -308,40 +315,41 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse // SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID. // And the keyspaces in the old keyspace group will be moved to the new keyspace group. -func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, keyspaces []uint32) error { - var splitFromKg, splitToKg *endpoint.KeyspaceGroup +func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error { + var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup m.Lock() defer m.Unlock() // TODO: avoid to split when the keyspaces is empty. if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { // Load the old keyspace group first. - splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitFromID) + splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID) if err != nil { return err } - if splitFromKg == nil { + if splitSourceKg == nil { return ErrKeyspaceGroupNotFound } - if splitFromKg.InSplit { + // A keyspace group can not take part in multiple split processes. + if splitSourceKg.IsSplitting() { return ErrKeyspaceGroupInSplit } // Check if the new keyspace group already exists. - splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID) + splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID) if err != nil { return err } - if splitToKg != nil { + if splitTargetKg != nil { return ErrKeyspaceGroupExists } // Check if the keyspaces are all in the old keyspace group. - if len(keyspaces) > len(splitFromKg.Keyspaces) { + if len(keyspaces) > len(splitSourceKg.Keyspaces) { return ErrKeyspaceNotInKeyspaceGroup } var ( - oldKeyspaceMap = make(map[uint32]struct{}, len(splitFromKg.Keyspaces)) + oldKeyspaceMap = make(map[uint32]struct{}, len(splitSourceKg.Keyspaces)) newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces)) ) - for _, keyspace := range splitFromKg.Keyspaces { + for _, keyspace := range splitSourceKg.Keyspaces { oldKeyspaceMap[keyspace] = struct{}{} } for _, keyspace := range keyspaces { @@ -351,75 +359,77 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, key newKeyspaceMap[keyspace] = struct{}{} } // Get the split keyspace group for the old keyspace group. - splitKeyspaces := make([]uint32, 0, len(splitFromKg.Keyspaces)-len(keyspaces)) - for _, keyspace := range splitFromKg.Keyspaces { + splitKeyspaces := make([]uint32, 0, len(splitSourceKg.Keyspaces)-len(keyspaces)) + for _, keyspace := range splitSourceKg.Keyspaces { if _, ok := newKeyspaceMap[keyspace]; !ok { splitKeyspaces = append(splitKeyspaces, keyspace) } } // Update the old keyspace group. - splitFromKg.Keyspaces = splitKeyspaces - splitFromKg.InSplit = true - if err = m.store.SaveKeyspaceGroup(txn, splitFromKg); err != nil { + splitSourceKg.Keyspaces = splitKeyspaces + splitSourceKg.SplitState = &endpoint.SplitState{ + SplitSource: splitSourceKg.ID, + } + if err = m.store.SaveKeyspaceGroup(txn, splitSourceKg); err != nil { return err } - splitToKg = &endpoint.KeyspaceGroup{ - ID: splitToID, + splitTargetKg = &endpoint.KeyspaceGroup{ + ID: splitTargetID, // Keep the same user kind and members as the old keyspace group. - UserKind: splitFromKg.UserKind, - Members: splitFromKg.Members, + UserKind: splitSourceKg.UserKind, + Members: splitSourceKg.Members, Keyspaces: keyspaces, - // Only set the new keyspace group in split state. - InSplit: true, - SplitFrom: splitFromKg.ID, + SplitState: &endpoint.SplitState{ + SplitSource: splitSourceKg.ID, + }, } // Create the new split keyspace group. - return m.store.SaveKeyspaceGroup(txn, splitToKg) + return m.store.SaveKeyspaceGroup(txn, splitTargetKg) }); err != nil { return err } // Update the keyspace group cache. - m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg) - m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg) + m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg) + m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg) return nil } -// FinishSplitKeyspaceByID finishes the split keyspace group by the split-to ID. -func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error { - var splitToKg, splitFromKg *endpoint.KeyspaceGroup +// FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID. +func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { + var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup m.Lock() defer m.Unlock() if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { - // Load the split-to keyspace group first. - splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID) + // Load the split target keyspace group first. + splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID) if err != nil { return err } - if splitToKg == nil { + if splitTargetKg == nil { return ErrKeyspaceGroupNotFound } // Check if it's in the split state. - if !splitToKg.InSplit { + if !splitTargetKg.IsSplitTarget() { return ErrKeyspaceGroupNotInSplit } - // Load the split-from keyspace group then. - splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitToKg.SplitFrom) + // Load the split source keyspace group then. + splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetKg.SplitSource()) if err != nil { return err } - if splitFromKg == nil { + if splitSourceKg == nil { return ErrKeyspaceGroupNotFound } - if !splitFromKg.InSplit { + if !splitSourceKg.IsSplitSource() { return ErrKeyspaceGroupNotInSplit } - splitToKg.InSplit = false - splitFromKg.InSplit = false - err = m.store.SaveKeyspaceGroup(txn, splitToKg) + splitTargetKg.SplitState = nil + splitSourceKg.SplitState = nil + err = m.store.SaveKeyspaceGroup(txn, splitTargetKg) if err != nil { return err } - err = m.store.SaveKeyspaceGroup(txn, splitFromKg) + err = m.store.SaveKeyspaceGroup(txn, splitSourceKg) if err != nil { return err } @@ -428,7 +438,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error { return err } // Update the keyspace group cache. - m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg) - m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg) + m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg) + m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg) return nil } diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 3878e8481f4..80f4d713c35 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -87,12 +87,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() { re.NoError(err) re.Equal(uint32(0), kg.ID) re.Equal(endpoint.Basic.String(), kg.UserKind) - re.False(kg.InSplit) + re.False(kg.IsSplitting()) kg, err = suite.kgm.GetKeyspaceGroupByID(3) re.NoError(err) re.Equal(uint32(3), kg.ID) re.Equal(endpoint.Standard.String(), kg.UserKind) - re.False(kg.InSplit) + re.False(kg.IsSplitting()) // remove the keyspace group 3 kg, err = suite.kgm.DeleteKeyspaceGroupByID(3) re.NoError(err) @@ -253,14 +253,14 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { re.NoError(err) re.Equal(uint32(2), kg2.ID) re.Equal([]uint32{111, 222}, kg2.Keyspaces) - re.True(kg2.InSplit) - re.Empty(kg2.SplitFrom) + re.True(kg2.IsSplitSource()) + re.Equal(kg2.ID, kg2.SplitSource()) kg4, err := suite.kgm.GetKeyspaceGroupByID(4) re.NoError(err) re.Equal(uint32(4), kg4.ID) re.Equal([]uint32{333}, kg4.Keyspaces) - re.True(kg4.InSplit) - re.Equal(kg2.ID, kg4.SplitFrom) + re.True(kg4.IsSplitTarget()) + re.Equal(kg2.ID, kg4.SplitSource()) re.Equal(kg2.UserKind, kg4.UserKind) re.Equal(kg2.Members, kg4.Members) @@ -293,14 +293,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { re.NoError(err) re.Equal(uint32(2), kg2.ID) re.Equal([]uint32{111, 222}, kg2.Keyspaces) - re.False(kg2.InSplit) - re.Empty(kg2.SplitFrom) + re.False(kg2.IsSplitting()) kg4, err = suite.kgm.GetKeyspaceGroupByID(4) re.NoError(err) re.Equal(uint32(4), kg4.ID) re.Equal([]uint32{333}, kg4.Keyspaces) - re.False(kg4.InSplit) - re.Equal(kg2.ID, kg4.SplitFrom) + re.False(kg4.IsSplitting()) re.Equal(kg2.UserKind, kg4.UserKind) re.Equal(kg2.Members, kg4.Members) diff --git a/pkg/mcs/tso/server/handler.go b/pkg/mcs/tso/server/handler.go index 827b6773ba1..4ce638e04b9 100644 --- a/pkg/mcs/tso/server/handler.go +++ b/pkg/mcs/tso/server/handler.go @@ -17,7 +17,6 @@ package server import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/tso" "go.uber.org/zap" ) @@ -31,14 +30,16 @@ func newHandler(s *Server) *Handler { return &Handler{s: s} } -// ResetTS resets the ts with specified tso. -// TODO: Support multiple keyspace groups. -func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error { +// ResetTS resets the TSO with the specified one. +func (h *Handler) ResetTS( + ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32, +) error { log.Info("reset-ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), - zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) - tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID) + zap.Bool("skip-upper-bound-check", skipUpperBoundCheck), + zap.Uint32("keyspace-group-id", keyspaceGroupID)) + tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(keyspaceGroupID) if err != nil { log.Error("failed to get allocator manager", errs.ZapError(err)) return err diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index ebf389736f7..14365aa9650 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -442,7 +442,7 @@ func (s *Server) startServer() (err error) { tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/member/member.go b/pkg/member/member.go index 399305089c2..24f6eea4b54 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -173,8 +173,8 @@ func (m *EmbeddedEtcdMember) KeepLeader(ctx context.Context) { m.leadership.Keep(ctx) } -// PrecheckLeader does some pre-check before checking whether or not it's the leader. -func (m *EmbeddedEtcdMember) PrecheckLeader() error { +// PreCheckLeader does some pre-check before checking whether or not it's the leader. +func (m *EmbeddedEtcdMember) PreCheckLeader() error { if m.GetEtcdLeader() == 0 { return errs.ErrEtcdLeaderNotFound } @@ -198,7 +198,7 @@ func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) // CheckLeader checks if someone else is taking the leadership. If yes, returns the leader; // otherwise returns a bool which indicates if it is needed to check later. func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { - if err := m.PrecheckLeader(); err != nil { + if err := m.PreCheckLeader(); err != nil { log.Error("failed to pass pre-check, check pd leader later", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 928328b214f..8fb313cd242 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -31,6 +31,8 @@ import ( "go.uber.org/zap" ) +type leadershipCheckFunc func(*election.Leadership) bool + // Participant is used for the election related logic. Compared to its counterpart // EmbeddedEtcdMember, Participant relies on etcd for election, but it's decoupled // from the embedded etcd. It implements Member interface. @@ -46,6 +48,9 @@ type Participant struct { // leader key when this participant is successfully elected as the leader of // the group. Every write will use it to check the leadership. memberValue string + // preCampaignChecker is called before the campaign. If it returns false, the + // campaign will be skipped. + preCampaignChecker leadershipCheckFunc } // NewParticipant create a new Participant. @@ -162,6 +167,9 @@ func (m *Participant) GetLeadership() *election.Leadership { // CampaignLeader is used to campaign the leadership and make it become a leader. func (m *Participant) CampaignLeader(leaseTimeout int64) error { + if m.preCampaignChecker != nil && !m.preCampaignChecker(m.leadership) { + return errs.ErrPreCheckCampaign + } return m.leadership.Campaign(leaseTimeout, m.MemberValue()) } @@ -170,9 +178,9 @@ func (m *Participant) KeepLeader(ctx context.Context) { m.leadership.Keep(ctx) } -// PrecheckLeader does some pre-check before checking whether or not it's the leader. +// PreCheckLeader does some pre-check before checking whether or not it's the leader. // It returns true if it passes the pre-check, false otherwise. -func (m *Participant) PrecheckLeader() error { +func (m *Participant) PreCheckLeader() error { // No specific thing to check. Returns no error. return nil } @@ -194,7 +202,7 @@ func (m *Participant) getPersistentLeader() (*tsopb.Participant, int64, error) { // CheckLeader checks if someone else is taking the leadership. If yes, returns the leader; // otherwise returns a bool which indicates if it is needed to check later. func (m *Participant) CheckLeader() (ElectionLeader, bool) { - if err := m.PrecheckLeader(); err != nil { + if err := m.PreCheckLeader(); err != nil { log.Error("failed to pass pre-check, check the leader later", errs.ZapError(errs.ErrEtcdLeaderNotFound)) time.Sleep(200 * time.Millisecond) return nil, true @@ -328,3 +336,8 @@ func (m *Participant) GetLeaderPriority(id uint64) (int, error) { } return int(priority), nil } + +// SetPreCampaignChecker sets the pre-campaign checker. +func (m *Participant) SetPreCampaignChecker(checker leadershipCheckFunc) { + m.preCampaignChecker = checker +} diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 9212d14c96d..7d63321230f 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -65,16 +65,20 @@ type KeyspaceGroupMember struct { Address string `json:"address"` } +// SplitState defines the split state of a keyspace group. +type SplitState struct { + // SplitSource is the current keyspace group ID from which the keyspace group is split. + // When the keyspace group is being split to another keyspace group, the split-source will + // be set to its own ID. + SplitSource uint32 `json:"split-source"` +} + // KeyspaceGroup is the keyspace group. type KeyspaceGroup struct { ID uint32 `json:"id"` UserKind string `json:"user-kind"` - // InSplit indicates whether the keyspace group is in split. - // Both the split-from and split-to keyspace groups wll be in split state. - // Once in split state, the keyspace group will not be able to be updated externally. - InSplit bool `json:"in-split"` - // SplitFrom is the keyspace group ID from which the keyspace group is split. - SplitFrom uint32 `json:"split-from"` + // SplitState is the current split state of the keyspace group. + SplitState *SplitState `json:"split-state,omitempty"` // Members are the election members which campaign for the primary of the keyspace group. Members []KeyspaceGroupMember `json:"members"` // Keyspaces are the keyspace IDs which belong to the keyspace group. @@ -84,6 +88,30 @@ type KeyspaceGroup struct { KeyspaceLookupTable map[uint32]struct{} `json:"-"` } +// IsSplitting checks if the keyspace group is in split state. +func (kg *KeyspaceGroup) IsSplitting() bool { + return kg != nil && kg.SplitState != nil +} + +// IsSplitTarget checks if the keyspace group is in split state and is the split target. +func (kg *KeyspaceGroup) IsSplitTarget() bool { + return kg.IsSplitting() && kg.SplitState.SplitSource != kg.ID +} + +// IsSplitSource checks if the keyspace group is in split state and is the split source. +func (kg *KeyspaceGroup) IsSplitSource() bool { + return kg.IsSplitting() && kg.SplitState.SplitSource == kg.ID +} + +// SplitSource returns the keyspace group split source ID. When the keyspace group is the split source +// itself, it will return its own ID. +func (kg *KeyspaceGroup) SplitSource() uint32 { + if kg.IsSplitting() { + return kg.SplitState.SplitSource + } + return 0 +} + // KeyspaceGroupStorage is the interface for keyspace group storage. type KeyspaceGroupStorage interface { LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) diff --git a/pkg/tso/admin.go b/pkg/tso/admin.go index 337d09e677a..7d510cdef65 100644 --- a/pkg/tso/admin.go +++ b/pkg/tso/admin.go @@ -25,7 +25,7 @@ import ( // Handler defines the common behaviors of a basic tso handler. type Handler interface { - ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error + ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32) error } // AdminHandler wrap the basic tso handler to provide http service. @@ -93,7 +93,7 @@ func (h *AdminHandler) ResetTS(w http.ResponseWriter, r *http.Request) { ignoreSmaller, skipUpperBoundCheck = true, true } - if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck); err != nil { + if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0); err != nil { if err == errs.ErrServerNotStarted { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) } else { diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 2c98fcb5988..c2723879ef5 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -139,8 +139,8 @@ type ElectionMember interface { GetDCLocationPathPrefix() string // GetDCLocationPath returns the dc-location path of a member with the given member ID. GetDCLocationPath(id uint64) string - // PrecheckLeader does some pre-check before checking whether it's the leader. - PrecheckLeader() error + // PreCheckLeader does some pre-check before checking whether it's the leader. + PreCheckLeader() error } // AllocatorManager is used to manage the TSO Allocators a PD server holds. diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 3badc064190..c7c9460a8c1 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -16,12 +16,12 @@ package tso import ( "context" + "errors" "fmt" "sync" "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -380,7 +380,7 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil { log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), - errs.ZapError(errors.Errorf("%s", syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) + errs.ZapError(errors.New(syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) return } }(ctx, leaderConn, respCh) @@ -504,9 +504,12 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("start to campaign the primary", zap.String("campaign-tso-primary-name", gta.member.Name())) if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil { - if err.Error() == errs.ErrEtcdTxnConflict.Error() { + if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", zap.String("campaign-tso-primary-name", gta.member.Name())) + } else if errors.Is(err, errs.ErrPreCheckCampaign) { + log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split", + zap.String("campaign-tso-primary-name", gta.member.Name())) } else { log.Error("campaign tso primary meets error due to etcd error", zap.String("campaign-tso-primary-name", gta.member.Name()), errs.ZapError(err)) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7c93a81ce59..5b665bbcfcb 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "path" "sort" "strings" @@ -29,15 +30,18 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/tsoutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -150,6 +154,7 @@ type KeyspaceGroupManager struct { // tsoServiceID is the service ID of the TSO service, registered in the service discovery tsoServiceID *discovery.ServiceRegistryEntry etcdClient *clientv3.Client + httpClient *http.Client // electionNamePrefix is the name prefix to generate the unique name of a participant, // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" @@ -197,6 +202,7 @@ func NewKeyspaceGroupManager( ctx context.Context, tsoServiceID *discovery.ServiceRegistryEntry, etcdClient *clientv3.Client, + httpClient *http.Client, electionNamePrefix string, legacySvcRootPath string, tsoSvcRootPath string, @@ -214,6 +220,7 @@ func NewKeyspaceGroupManager( cancel: cancel, tsoServiceID: tsoServiceID, etcdClient: etcdClient, + httpClient: httpClient, electionNamePrefix: electionNamePrefix, legacySvcRootPath: legacySvcRootPath, tsoSvcRootPath: tsoSvcRootPath, @@ -524,61 +531,76 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro log.Warn("keyspace group ID is invalid, ignore it", zap.Error(err)) return } - - assignedToMe := kgm.isAssignedToMe(group) - if assignedToMe { - if oldAM, oldGroup := kgm.state.getKeyspaceGroupMeta(group.ID); oldAM != nil { - log.Info("keyspace group already initialized, so update meta only", - zap.Uint32("keyspace-group-id", group.ID)) - kgm.updateKeyspaceGroupMembership(oldGroup, group) - return - } - - uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID) - uniqueID := memberutil.GenerateUniqueID(uniqueName) - log.Info("joining primary election", - zap.Uint32("keyspace-group-id", group.ID), - zap.String("participant-name", uniqueName), - zap.Uint64("participant-id", uniqueID)) - - // TODO: handle the keyspace group & TSO split logic. - participant := member.NewParticipant(kgm.etcdClient) - participant.InitInfo( - uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)), - primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) - - // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. - var ( - tsRootPath string - storage *endpoint.StorageEndpoint - ) - if group.ID == mcsutils.DefaultKeyspaceGroupID { - tsRootPath = kgm.legacySvcRootPath - storage = kgm.legacySvcStorage - } else { - tsRootPath = kgm.tsoSvcRootPath - storage = kgm.tsoSvcStorage - } - - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) - - kgm.Lock() - group.KeyspaceLookupTable = make(map[uint32]struct{}) - for _, kid := range group.Keyspaces { - group.KeyspaceLookupTable[kid] = struct{}{} - kgm.keyspaceLookupTable[kid] = group.ID - } - kgm.kgs[group.ID] = group - kgm.ams[group.ID] = am - kgm.Unlock() - } else { + // Not assigned to me. If this host/pod owns this keyspace group, it should resign. + if !kgm.isAssignedToMe(group) { if group.ID == mcsutils.DefaultKeyspaceGroupID { log.Info("resign default keyspace group membership", zap.Any("default-keyspace-group", group)) } - // Not assigned to me. If this host/pod owns this keyspace group, it should resign. kgm.deleteKeyspaceGroup(group.ID) + return + } + // If the keyspace group is already initialized, just update the meta. + if oldAM, oldGroup := kgm.state.getKeyspaceGroupMeta(group.ID); oldAM != nil { + log.Info("keyspace group already initialized, so update meta only", + zap.Uint32("keyspace-group-id", group.ID)) + kgm.updateKeyspaceGroupMembership(oldGroup, group) + return } + // If the keyspace group is not initialized, initialize it. + uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID) + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", + zap.Uint32("keyspace-group-id", group.ID), + zap.String("participant-name", uniqueName), + zap.Uint64("participant-id", uniqueID)) + // Initialize the participant info to join the primary election. + participant := member.NewParticipant(kgm.etcdClient) + participant.InitInfo( + uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)), + primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group + // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot + // be broken until the entire split process is completed. + if group.IsSplitTarget() { + splitSource := group.SplitSource() + log.Info("keyspace group is in split", + zap.Uint32("keyspace-group-id", group.ID), + zap.Uint32("source", splitSource)) + splitSourceAM, _ := kgm.getKeyspaceGroupMeta(splitSource) + if splitSourceAM == nil { + // TODO: guarantee that the split source keyspace group is initialized before. + log.Error("the split source keyspace group is not initialized", + zap.Uint32("source", splitSource)) + return + } + participant.SetPreCampaignChecker(func(leadership *election.Leadership) bool { + return splitSourceAM.getMember().IsLeader() + }) + } + // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. + var ( + tsRootPath string + storage *endpoint.StorageEndpoint + ) + if group.ID == mcsutils.DefaultKeyspaceGroupID { + tsRootPath = kgm.legacySvcRootPath + storage = kgm.legacySvcStorage + } else { + tsRootPath = kgm.tsoSvcRootPath + storage = kgm.tsoSvcStorage + } + // Initialize all kinds of maps. + am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + kgm.Lock() + group.KeyspaceLookupTable = make(map[uint32]struct{}) + for _, kid := range group.Keyspaces { + group.KeyspaceLookupTable[kid] = struct{}{} + kgm.keyspaceLookupTable[kid] = group.ID + } + kgm.kgs[group.ID] = group + kgm.ams[group.ID] = am + kgm.Unlock() } // updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group. @@ -648,6 +670,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( } } } + // Check if the split is completed. + if oldGroup.IsSplitTarget() && !newGroup.IsSplitting() { + kgm.ams[groupID].getMember().(*member.Participant).SetPreCampaignChecker(nil) + } kgm.kgs[groupID] = newGroup } @@ -716,6 +742,10 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, currentKeyspaceGroupID, err } + err = kgm.checkTSOSplit(currentKeyspaceGroupID, dcLocation) + if err != nil { + return pdpb.Timestamp{}, currentKeyspaceGroupID, err + } ts, err = am.HandleRequest(dcLocation, count) return ts, keyspaceGroupID, err } @@ -734,3 +764,73 @@ func genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error { "requested keyspace group with id %d %s by this host/pod", keyspaceGroupID, errs.NotServedErr)) } + +// checkTSOSplit checks if the given keyspace group is in split state, and if so, it will make sure the +// newly split TSO keep consistent with the original one. +func (kgm *KeyspaceGroupManager) checkTSOSplit( + keyspaceGroupID uint32, + dcLocation string, +) error { + splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID) + // Only the split target keyspace group needs to check the TSO split. + if !splitGroup.IsSplitTarget() { + return nil + } + splitSource := splitGroup.SplitSource() + splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource) + if splitSourceAM == nil || splitSourceGroup == nil { + log.Error("the split source keyspace group is not initialized", + zap.Uint32("source", splitSource)) + return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource) + } + splitAllocator, err := splitAM.GetAllocator(dcLocation) + if err != nil { + return err + } + splitSourceAllocator, err := splitSourceAM.GetAllocator(dcLocation) + if err != nil { + return err + } + splitTSO, err := splitAllocator.GenerateTSO(1) + if err != nil { + return err + } + splitSourceTSO, err := splitSourceAllocator.GenerateTSO(1) + if err != nil { + return err + } + if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 { + return nil + } + // If the split source TSO is greater than the newly split TSO, we need to update the split + // TSO to make sure the following TSO will be greater than the split keyspaces ever had + // in the past. + splitSourceTSO.Physical += 1 + err = splitAllocator.SetTSO(tsoutil.GenerateTS(&splitSourceTSO), true, true) + if err != nil { + return err + } + // Finish the split state. + return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) +} + +const keyspaceGroupsAPIPrefix = "/pd/api/v2/tso/keyspace-groups" + +func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { + if kgm.httpClient == nil { + return nil + } + statusCode, err := apiutil.DoDelete( + kgm.httpClient, + kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/split", id)) + if err != nil { + return err + } + if statusCode != http.StatusOK { + log.Warn("failed to finish split keyspace group", + zap.Uint32("keyspace-group-id", id), + zap.Int("status-code", statusCode)) + return errs.ErrSendRequest.FastGenByArgs() + } + return nil +} diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 33f0b72c1f7..667d5b76dac 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -538,7 +538,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, ) *KeyspaceGroupManager { return NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, suite.cfg) } @@ -546,7 +546,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( re *require.Assertions, - numberOfKeypaceGroupsToAdd int, + numberOfKeyspaceGroupsToAdd int, loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod ) { @@ -558,14 +558,14 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( step := 30 mux := sync.Mutex{} wg := sync.WaitGroup{} - for i := 0; i < numberOfKeypaceGroupsToAdd; i += step { + for i := 0; i < numberOfKeyspaceGroupsToAdd; i += step { wg.Add(1) go func(startID int) { defer wg.Done() endID := startID + step - if endID > numberOfKeypaceGroupsToAdd { - endID = numberOfKeypaceGroupsToAdd + if endID > numberOfKeyspaceGroupsToAdd { + endID = numberOfKeyspaceGroupsToAdd } randomGen := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -593,7 +593,7 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( re.NoError(err) // If no keyspace group is assigned to this host/pod, the default keyspace group should be initialized. - if numberOfKeypaceGroupsToAdd <= 0 { + if numberOfKeyspaceGroupsToAdd <= 0 { expectedGroupIDs = append(expectedGroupIDs, mcsutils.DefaultKeyspaceGroupID) } diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 61908fe39a7..c3bb4c02aad 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -193,7 +193,7 @@ func (lta *LocalTSOAllocator) isSameAllocatorLeader(leader *pdpb.Member) bool { // CheckAllocatorLeader checks who is the current Local TSO Allocator leader, and returns true if it is needed to check later. func (lta *LocalTSOAllocator) CheckAllocatorLeader() (*pdpb.Member, int64, bool) { - if err := lta.allocatorManager.member.PrecheckLeader(); err != nil { + if err := lta.allocatorManager.member.PreCheckLeader(); err != nil { log.Error("no etcd leader, check local tso allocator leader later", zap.String("dc-location", lta.timestampOracle.dcLocation), errs.ZapError(err)) time.Sleep(200 * time.Millisecond) diff --git a/server/handler.go b/server/handler.go index 35cec65527f..bc427fa90de 100644 --- a/server/handler.go +++ b/server/handler.go @@ -926,7 +926,7 @@ func (h *Handler) GetOfflinePeer(typ statistics.RegionStatisticType) ([]*core.Re } // ResetTS resets the ts with specified tso. -func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error { +func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error { log.Info("reset-ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go new file mode 100644 index 00000000000..159887c1cd2 --- /dev/null +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -0,0 +1,153 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tsopb" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/testutil" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/storage/endpoint" + tsopkg "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" + handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" + "google.golang.org/grpc" +) + +type tsoKeyspaceGroupManagerTestSuite struct { + suite.Suite + + ctx context.Context + cancel context.CancelFunc + + // The PD cluster. + cluster *tests.TestCluster + // pdLeaderServer is the leader server of the PD cluster. + pdLeaderServer *tests.TestServer + // tsoServer is the TSO service provider. + tsoServer *tso.Server + tsoServerCleanup func() + tsoClientConn *grpc.ClientConn + + tsoClient tsopb.TSOClient +} + +func TestTSOKeyspaceGroupManager(t *testing.T) { + suite.Run(t, &tsoKeyspaceGroupManagerTestSuite{}) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { + re := suite.Require() + + var err error + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + re.NoError(err) + err = suite.cluster.RunInitialServers() + re.NoError(err) + leaderName := suite.cluster.WaitLeader() + suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + re.NoError(suite.pdLeaderServer.BootstrapCluster()) + backendEndpoints := suite.pdLeaderServer.GetAddr() + suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) + suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr()) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() { + suite.cancel() + suite.tsoClientConn.Close() + suite.tsoServerCleanup() + suite.cluster.Destroy() +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: []endpoint.KeyspaceGroupMember{{Address: suite.tsoServer.GetAddr()}}, + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + // Get a TSO from the keyspace group 1. + var ts *pdpb.Timestamp + testutil.Eventually(re, func() bool { + resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 1) + ts = resp.GetTimestamp() + return err == nil && tsoutil.CompareTimestamp(ts, &pdpb.Timestamp{}) > 0 + }) + ts.Physical += time.Hour.Milliseconds() + // Set the TSO of the keyspace group 1 to a large value. + err := suite.tsoServer.GetHandler().ResetTS(tsoutil.GenerateTS(ts), false, true, 1) + re.NoError(err) + // Split the keyspace group 1 to 2. + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: 2, + Keyspaces: []uint32{222, 333}, + }) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{222, 333}, kg2.Keyspaces) + re.True(kg2.IsSplitTarget()) + // Check the split TSO from keyspace group 2. + var splitTS *pdpb.Timestamp + testutil.Eventually(re, func() bool { + resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 2) + splitTS = resp.GetTimestamp() + return err == nil && tsoutil.CompareTimestamp(splitTS, &pdpb.Timestamp{}) > 0 + }) + re.Greater(tsoutil.CompareTimestamp(splitTS, ts), 0) +} + +func request( + re *require.Assertions, + ctx context.Context, client tsopb.TSOClient, count uint32, + clusterID uint64, keyspaceID, keyspaceGroupID uint32, +) (ts *tsopb.TsoResponse, err error) { + req := &tsopb.TsoRequest{ + Header: &tsopb.RequestHeader{ + ClusterId: clusterID, + KeyspaceId: keyspaceID, + KeyspaceGroupId: keyspaceGroupID, + }, + DcLocation: tsopkg.GlobalDCLocation, + Count: count, + } + tsoClient, err := client.Tso(ctx) + re.NoError(err) + defer tsoClient.CloseSend() + re.NoError(tsoClient.Send(req)) + return tsoClient.Recv() +} diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 173d09ffdf0..c9c978e7d7f 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -107,9 +107,9 @@ func (suite *tsoServerTestSuite) getClusterID() uint64 { func (suite *tsoServerTestSuite) resetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) { var err error if suite.legacy { - err = suite.pdLeaderServer.GetServer().GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck) + err = suite.pdLeaderServer.GetServer().GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0) } else { - err = suite.tsoServer.GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck) + err = suite.tsoServer.GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0) } // Only this error is acceptable. if err != nil { diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index b9b9742b2dc..9235160c6cb 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -12,15 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handlers_test +package handlers import ( - "bytes" "context" - "encoding/json" "fmt" - "io" - "net/http" "testing" "github.com/pingcap/kvproto/pkg/keyspacepb" @@ -33,15 +29,6 @@ import ( "go.uber.org/goleak" ) -const keyspacesPrefix = "/pd/api/v2/keyspaces" - -// dialClient used to dial http request. -var dialClient = &http.Client{ - Transport: &http.Transport{ - DisableKeepAlives: true, - }, -} - func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.LeakOptions...) } @@ -151,44 +138,6 @@ func (suite *keyspaceTestSuite) TestLoadRangeKeyspace() { re.Equal(keyspacepb.KeyspaceState_ENABLED, loadResponse.Keyspaces[0].State) } -func sendLoadRangeRequest(re *require.Assertions, server *tests.TestServer, token, limit string) *handlers.LoadAllKeyspacesResponse { - // Construct load range request. - httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspacesPrefix, nil) - re.NoError(err) - query := httpReq.URL.Query() - query.Add("page_token", token) - query.Add("limit", limit) - httpReq.URL.RawQuery = query.Encode() - // Send request. - httpResp, err := dialClient.Do(httpReq) - re.NoError(err) - defer httpResp.Body.Close() - re.Equal(http.StatusOK, httpResp.StatusCode) - // Receive & decode response. - data, err := io.ReadAll(httpResp.Body) - re.NoError(err) - resp := &handlers.LoadAllKeyspacesResponse{} - re.NoError(json.Unmarshal(data, resp)) - return resp -} - -func sendUpdateStateRequest(re *require.Assertions, server *tests.TestServer, name string, request *handlers.UpdateStateParam) (bool, *keyspacepb.KeyspaceMeta) { - data, err := json.Marshal(request) - re.NoError(err) - httpReq, err := http.NewRequest(http.MethodPut, server.GetAddr()+keyspacesPrefix+"/"+name+"/state", bytes.NewBuffer(data)) - re.NoError(err) - httpResp, err := dialClient.Do(httpReq) - re.NoError(err) - defer httpResp.Body.Close() - if httpResp.StatusCode != http.StatusOK { - return false, nil - } - data, err = io.ReadAll(httpResp.Body) - re.NoError(err) - meta := &handlers.KeyspaceMeta{} - re.NoError(json.Unmarshal(data, meta)) - return true, meta.KeyspaceMeta -} func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, count int) []*keyspacepb.KeyspaceMeta { testConfig := map[string]string{ "config1": "100", @@ -205,58 +154,6 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, cou return resultMeta } -func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta { - data, err := json.Marshal(request) - re.NoError(err) - httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspacesPrefix, bytes.NewBuffer(data)) - re.NoError(err) - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - data, err = io.ReadAll(resp.Body) - re.NoError(err) - meta := &handlers.KeyspaceMeta{} - re.NoError(json.Unmarshal(data, meta)) - checkCreateRequest(re, request, meta.KeyspaceMeta) - return meta.KeyspaceMeta -} - -func mustUpdateKeyspaceConfig(re *require.Assertions, server *tests.TestServer, name string, request *handlers.UpdateConfigParams) *keyspacepb.KeyspaceMeta { - data, err := json.Marshal(request) - re.NoError(err) - httpReq, err := http.NewRequest(http.MethodPatch, server.GetAddr()+keyspacesPrefix+"/"+name+"/config", bytes.NewBuffer(data)) - re.NoError(err) - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - data, err = io.ReadAll(resp.Body) - re.NoError(err) - meta := &handlers.KeyspaceMeta{} - re.NoError(json.Unmarshal(data, meta)) - return meta.KeyspaceMeta -} - -func mustLoadKeyspaces(re *require.Assertions, server *tests.TestServer, name string) *keyspacepb.KeyspaceMeta { - resp, err := dialClient.Get(server.GetAddr() + keyspacesPrefix + "/" + name) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - data, err := io.ReadAll(resp.Body) - re.NoError(err) - meta := &handlers.KeyspaceMeta{} - re.NoError(json.Unmarshal(data, meta)) - return meta.KeyspaceMeta -} - -// checkCreateRequest verifies a keyspace meta matches a create request. -func checkCreateRequest(re *require.Assertions, request *handlers.CreateKeyspaceParams, meta *keyspacepb.KeyspaceMeta) { - re.Equal(request.Name, meta.Name) - re.Equal(keyspacepb.KeyspaceState_ENABLED, meta.State) - re.Equal(request.Config, meta.Config) -} - // checkUpdateRequest verifies a keyspace meta matches a update request. func checkUpdateRequest(re *require.Assertions, request *handlers.UpdateConfigParams, oldConfig, newConfig map[string]string) { expected := map[string]string{} diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go new file mode 100644 index 00000000000..6aee761eedb --- /dev/null +++ b/tests/server/apiv2/handlers/testutil.go @@ -0,0 +1,204 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/tests" +) + +const ( + keyspacesPrefix = "/pd/api/v2/keyspaces" + keyspaceGroupsPrefix = "/pd/api/v2/tso/keyspace-groups" +) + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +func sendLoadRangeRequest(re *require.Assertions, server *tests.TestServer, token, limit string) *handlers.LoadAllKeyspacesResponse { + // Construct load range request. + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspacesPrefix, nil) + re.NoError(err) + query := httpReq.URL.Query() + query.Add("page_token", token) + query.Add("limit", limit) + httpReq.URL.RawQuery = query.Encode() + // Send request. + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + re.Equal(http.StatusOK, httpResp.StatusCode) + // Receive & decode response. + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + resp := &handlers.LoadAllKeyspacesResponse{} + re.NoError(json.Unmarshal(data, resp)) + return resp +} + +func sendUpdateStateRequest(re *require.Assertions, server *tests.TestServer, name string, request *handlers.UpdateStateParam) (bool, *keyspacepb.KeyspaceMeta) { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPut, server.GetAddr()+keyspacesPrefix+"/"+name+"/state", bytes.NewBuffer(data)) + re.NoError(err) + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + if httpResp.StatusCode != http.StatusOK { + return false, nil + } + data, err = io.ReadAll(httpResp.Body) + re.NoError(err) + meta := &handlers.KeyspaceMeta{} + re.NoError(json.Unmarshal(data, meta)) + return true, meta.KeyspaceMeta +} + +func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspacesPrefix, bytes.NewBuffer(data)) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) + data, err = io.ReadAll(resp.Body) + re.NoError(err) + meta := &handlers.KeyspaceMeta{} + re.NoError(json.Unmarshal(data, meta)) + checkCreateRequest(re, request, meta.KeyspaceMeta) + return meta.KeyspaceMeta +} + +// checkCreateRequest verifies a keyspace meta matches a create request. +func checkCreateRequest(re *require.Assertions, request *handlers.CreateKeyspaceParams, meta *keyspacepb.KeyspaceMeta) { + re.Equal(request.Name, meta.Name) + re.Equal(keyspacepb.KeyspaceState_ENABLED, meta.State) + re.Equal(request.Config, meta.Config) +} + +func mustUpdateKeyspaceConfig(re *require.Assertions, server *tests.TestServer, name string, request *handlers.UpdateConfigParams) *keyspacepb.KeyspaceMeta { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPatch, server.GetAddr()+keyspacesPrefix+"/"+name+"/config", bytes.NewBuffer(data)) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) + data, err = io.ReadAll(resp.Body) + re.NoError(err) + meta := &handlers.KeyspaceMeta{} + re.NoError(json.Unmarshal(data, meta)) + return meta.KeyspaceMeta +} + +func mustLoadKeyspaces(re *require.Assertions, server *tests.TestServer, name string) *keyspacepb.KeyspaceMeta { + resp, err := dialClient.Get(server.GetAddr() + keyspacesPrefix + "/" + name) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) + data, err := io.ReadAll(resp.Body) + re.NoError(err) + meta := &handlers.KeyspaceMeta{} + re.NoError(json.Unmarshal(data, meta)) + return meta.KeyspaceMeta +} + +func sendLoadKeyspaceGroupRequest(re *require.Assertions, server *tests.TestServer, token, limit string) []*endpoint.KeyspaceGroup { + // Construct load range request. + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix, nil) + re.NoError(err) + query := httpReq.URL.Query() + query.Add("page_token", token) + query.Add("limit", limit) + httpReq.URL.RawQuery = query.Encode() + // Send request. + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + re.Equal(http.StatusOK, httpResp.StatusCode) + // Receive & decode response. + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + var resp []*endpoint.KeyspaceGroup + re.NoError(json.Unmarshal(data, &resp)) + return resp +} + +// MustLoadKeyspaceGroupByID loads the keyspace group by ID with HTTP API. +func MustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, id uint32) *endpoint.KeyspaceGroup { + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d", id), nil) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) + data, err := io.ReadAll(resp.Body) + re.NoError(err) + var kg endpoint.KeyspaceGroup + re.NoError(json.Unmarshal(data, &kg)) + return &kg +} + +// MustCreateKeyspaceGroup creates a keyspace group with HTTP API. +func MustCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) +} + +// MustSplitKeyspaceGroup updates a keyspace group with HTTP API. +func MustSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32, request *handlers.SplitKeyspaceGroupByIDParams) { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/split", id), bytes.NewBuffer(data)) + re.NoError(err) + // Send request. + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) +} + +// MustFinishSplitKeyspaceGroup finishes a keyspace group split with HTTP API. +func MustFinishSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32) { + httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/split", id), nil) + re.NoError(err) + // Send request. + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) +} diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 1eb542cb83f..bdc89428669 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -12,26 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handlers_test +package handlers import ( - "bytes" "context" - "encoding/json" - "fmt" - "io" - "net/http" "testing" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" ) -const keyspaceGroupsPrefix = "/pd/api/v2/tso/keyspace-groups" - type keyspaceGroupTestSuite struct { suite.Suite ctx context.Context @@ -73,7 +65,7 @@ func (suite *keyspaceGroupTestSuite) TestCreateKeyspaceGroups() { }, }} - mustCreateKeyspaceGroup(re, suite.server, kgs) + MustCreateKeyspaceGroup(re, suite.server, kgs) } func (suite *keyspaceGroupTestSuite) TestLoadKeyspaceGroup() { @@ -89,7 +81,7 @@ func (suite *keyspaceGroupTestSuite) TestLoadKeyspaceGroup() { }, }} - mustCreateKeyspaceGroup(re, suite.server, kgs) + MustCreateKeyspaceGroup(re, suite.server, kgs) resp := sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") re.Len(resp, 3) } @@ -104,101 +96,34 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() { }, }} - mustCreateKeyspaceGroup(re, suite.server, kgs) + MustCreateKeyspaceGroup(re, suite.server, kgs) resp := sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") re.Len(resp, 2) - mustSplitKeyspaceGroup(re, suite.server, 1, &handlers.SplitKeyspaceGroupByIDParams{ + MustSplitKeyspaceGroup(re, suite.server, 1, &handlers.SplitKeyspaceGroupByIDParams{ NewID: uint32(2), Keyspaces: []uint32{111, 222}, }) resp = sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") re.Len(resp, 3) // Check keyspace group 1. - kg1 := mustLoadKeyspaceGroupByID(re, suite.server, 1) + kg1 := MustLoadKeyspaceGroupByID(re, suite.server, 1) re.Equal(uint32(1), kg1.ID) re.Equal([]uint32{333}, kg1.Keyspaces) - re.True(kg1.InSplit) - re.Empty(kg1.SplitFrom) + re.True(kg1.IsSplitSource()) + re.Equal(kg1.ID, kg1.SplitSource()) // Check keyspace group 2. - kg2 := mustLoadKeyspaceGroupByID(re, suite.server, 2) + kg2 := MustLoadKeyspaceGroupByID(re, suite.server, 2) re.Equal(uint32(2), kg2.ID) re.Equal([]uint32{111, 222}, kg2.Keyspaces) - re.True(kg2.InSplit) - re.Equal(kg1.ID, kg2.SplitFrom) + re.True(kg2.IsSplitTarget()) + re.Equal(kg1.ID, kg2.SplitSource()) // They should have the same user kind and members. re.Equal(kg1.UserKind, kg2.UserKind) re.Equal(kg1.Members, kg2.Members) // Finish the split and check the split state. - mustFinishSplitKeyspaceGroup(re, suite.server, 2) - kg2 = mustLoadKeyspaceGroupByID(re, suite.server, 2) - re.False(kg2.InSplit) - re.Equal(kg1.ID, kg2.SplitFrom) -} - -func sendLoadKeyspaceGroupRequest(re *require.Assertions, server *tests.TestServer, token, limit string) []*endpoint.KeyspaceGroup { - // Construct load range request. - httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix, nil) - re.NoError(err) - query := httpReq.URL.Query() - query.Add("page_token", token) - query.Add("limit", limit) - httpReq.URL.RawQuery = query.Encode() - // Send request. - httpResp, err := dialClient.Do(httpReq) - re.NoError(err) - defer httpResp.Body.Close() - re.Equal(http.StatusOK, httpResp.StatusCode) - // Receive & decode response. - data, err := io.ReadAll(httpResp.Body) - re.NoError(err) - var resp []*endpoint.KeyspaceGroup - re.NoError(json.Unmarshal(data, &resp)) - return resp -} - -func mustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, id uint32) *endpoint.KeyspaceGroup { - httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d", id), nil) - re.NoError(err) - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - data, err := io.ReadAll(resp.Body) - re.NoError(err) - var kg endpoint.KeyspaceGroup - re.NoError(json.Unmarshal(data, &kg)) - return &kg -} - -func mustCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) { - data, err := json.Marshal(request) - re.NoError(err) - httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) - re.NoError(err) - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) -} - -func mustSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32, request *handlers.SplitKeyspaceGroupByIDParams) { - data, err := json.Marshal(request) - re.NoError(err) - httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/split", id), bytes.NewBuffer(data)) - re.NoError(err) - // Send request. - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) -} - -func mustFinishSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32) { - httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/split", id), nil) - re.NoError(err) - // Send request. - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) + MustFinishSplitKeyspaceGroup(re, suite.server, 2) + kg1 = MustLoadKeyspaceGroupByID(re, suite.server, 1) + re.False(kg1.IsSplitting()) + kg2 = MustLoadKeyspaceGroupByID(re, suite.server, 2) + re.False(kg2.IsSplitting()) }