Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: avoid keyspace being updated during the split #6316

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 86 additions & 31 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro
if kg == nil {
return nil
}
if kg.InSplit {
return ErrKeyspaceGroupInSplit
}
return m.store.DeleteKeyspaceGroup(txn, id)
}); err != nil {
return nil, err
Expand All @@ -171,6 +174,9 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
if oldKG != nil && !overwrite {
return ErrKeyspaceGroupExists
}
if oldKG != nil && oldKG.InSplit && overwrite {
return ErrKeyspaceGroupInSplit
}
m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Expand Down Expand Up @@ -222,6 +228,9 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
if kg == nil {
return errors.Errorf("keyspace group %d not found", id)
}
if kg.InSplit {
return ErrKeyspaceGroupInSplit
}
switch mutation {
case opAdd:
if !slice.Contains(kg.Keyspaces, keyspaceID) {
Expand Down Expand Up @@ -265,6 +274,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
if newKG == nil {
return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind)
}
if oldKG.InSplit || newKG.InSplit {
return ErrKeyspaceGroupInSplit
}

var updateOld, updateNew bool
if !slice.Contains(newKG.Keyspaces, keyspaceID) {
Expand Down Expand Up @@ -294,34 +306,40 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(id, newID uint32, keyspaces []uint32) error {
func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, keyspaces []uint32) error {
var splitFromKg, splitToKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
// TODO: avoid to split when the keyspaces is empty.
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
oldKg, err := m.store.LoadKeyspaceGroup(txn, id)
splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitFromID)
if err != nil {
return err
}
if oldKg == nil {
if splitFromKg == nil {
return ErrKeyspaceGroupNotFound
}
if splitFromKg.InSplit {
return ErrKeyspaceGroupInSplit
}
// Check if the new keyspace group already exists.
newKg, err := m.store.LoadKeyspaceGroup(txn, newID)
splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID)
if err != nil {
return err
}
if newKg != nil {
if splitToKg != nil {
return ErrKeyspaceGroupExists
}
// Check if the keyspaces are all in the old keyspace group.
if len(keyspaces) > len(oldKg.Keyspaces) {
if len(keyspaces) > len(splitFromKg.Keyspaces) {
return ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, len(oldKg.Keyspaces))
oldKeyspaceMap = make(map[uint32]struct{}, len(splitFromKg.Keyspaces))
newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces))
)
for _, keyspace := range oldKg.Keyspaces {
for _, keyspace := range splitFromKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range keyspaces {
Expand All @@ -331,47 +349,84 @@ func (m *GroupManager) SplitKeyspaceGroupByID(id, newID uint32, keyspaces []uint
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, len(oldKg.Keyspaces)-len(keyspaces))
for _, keyspace := range oldKg.Keyspaces {
splitKeyspaces := make([]uint32, 0, len(splitFromKg.Keyspaces)-len(keyspaces))
for _, keyspace := range splitFromKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
}
}
// Update the old keyspace group.
oldKg.Keyspaces = splitKeyspaces
if err = m.store.SaveKeyspaceGroup(txn, oldKg); err != nil {
splitFromKg.Keyspaces = splitKeyspaces
splitFromKg.InSplit = true
if err = m.store.SaveKeyspaceGroup(txn, splitFromKg); err != nil {
return err
}
// Create the new split keyspace group.
return m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
ID: newID,
splitToKg = &endpoint.KeyspaceGroup{
ID: splitToID,
// Keep the same user kind and members as the old keyspace group.
UserKind: oldKg.UserKind,
Members: oldKg.Members,
UserKind: splitFromKg.UserKind,
Members: splitFromKg.Members,
Keyspaces: keyspaces,
// Only set the new keyspace group in split state.
InSplit: true,
SplitFrom: oldKg.ID,
})
})
SplitFrom: splitFromKg.ID,
}
// Create the new split keyspace group.
return m.store.SaveKeyspaceGroup(txn, splitToKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg)
m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg)
return nil
}

// FinishSplitKeyspaceByID finishes the split keyspace group by ID.
func (m *GroupManager) FinishSplitKeyspaceByID(id uint32) error {
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
// Load the keyspace group first.
kg, err := m.store.LoadKeyspaceGroup(txn, id)
// FinishSplitKeyspaceByID finishes the split keyspace group by the split-to ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error {
var splitToKg, splitFromKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the split-to keyspace group first.
splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID)
if err != nil {
return err
}
if kg == nil {
if splitToKg == nil {
return ErrKeyspaceGroupNotFound
}
// Check if it's in the split state.
if !kg.InSplit {
if !splitToKg.InSplit {
return ErrKeyspaceGroupNotInSplit
}
kg.InSplit = false
return m.store.SaveKeyspaceGroup(txn, kg)
})
// Load the split-from keyspace group then.
splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitToKg.SplitFrom)
if err != nil {
return err
}
if splitFromKg == nil {
return ErrKeyspaceGroupNotFound
}
if !splitFromKg.InSplit {
return ErrKeyspaceGroupNotInSplit
}
splitToKg.InSplit = false
splitFromKg.InSplit = false
err = m.store.SaveKeyspaceGroup(txn, splitToKg)
if err != nil {
return err
}
err = m.store.SaveKeyspaceGroup(txn, splitFromKg)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg)
m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg)
return nil
}
38 changes: 34 additions & 4 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.False(kg2.InSplit)
re.True(kg2.InSplit)
re.Empty(kg2.SplitFrom)
kg4, err := suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
Expand All @@ -263,20 +263,50 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re.Equal(kg2.ID, kg4.SplitFrom)
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)

// finish the split of the keyspace group 2
err = suite.kgm.FinishSplitKeyspaceByID(2)
re.ErrorIs(err, ErrKeyspaceGroupNotInSplit)
// finish the split of a non-existing keyspace group
err = suite.kgm.FinishSplitKeyspaceByID(5)
re.ErrorIs(err, ErrKeyspaceGroupNotFound)
// split the in-split keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
// remove the in-split keyspace group
kg2, err = suite.kgm.DeleteKeyspaceGroupByID(2)
re.Nil(kg2)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
kg4, err = suite.kgm.DeleteKeyspaceGroupByID(4)
re.Nil(kg4)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
// update the in-split keyspace group
err = suite.kg.kgm.UpdateKeyspaceForGroup(endpoint.Standard, "2", 444, opAdd)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
err = suite.kg.kgm.UpdateKeyspaceForGroup(endpoint.Standard, "4", 444, opAdd)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)

// finish the split of keyspace group 4
err = suite.kgm.FinishSplitKeyspaceByID(4)
re.NoError(err)
kg2, err = suite.kgm.GetKeyspaceGroupByID(2)
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.False(kg2.InSplit)
re.Empty(kg2.SplitFrom)
kg4, err = suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.Equal([]uint32{333}, kg4.Keyspaces)
re.False(kg4.InSplit)
re.Equal(kg2.ID, kg4.SplitFrom)
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)

// split a non-existing keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(3, 5, nil)
re.ErrorIs(err, ErrKeyspaceGroupNotFound)
// finish the split of a non-existing keyspace group
err = suite.kgm.FinishSplitKeyspaceByID(5)
re.ErrorIs(err, ErrKeyspaceGroupNotFound)
// split into an existing keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil)
re.ErrorIs(err, ErrKeyspaceGroupExists)
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
ErrKeyspaceGroupExists = errors.New("keyspace group already exists")
// ErrKeyspaceGroupNotFound is used to indicate target keyspace group does not exist.
ErrKeyspaceGroupNotFound = errors.New("keyspace group does not exist")
// ErrKeyspaceGroupInSplit is used to indicate target keyspace group is in split state.
ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state")
// ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state.
ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// InSplit indicates whether the keyspace group is in split.
// Both the split-from and split-to keyspace groups wll be in split state.
// Once in split state, the keyspace group will not be able to be updated externally.
InSplit bool `json:"in-split"`
// SplitFrom is the keyspace group ID from which the keyspace group is split.
SplitFrom uint32 `json:"split-from"`
Expand Down
2 changes: 1 addition & 1 deletion tests/server/apiv2/handlers/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() {
kg1 := mustLoadKeyspaceGroupByID(re, suite.server, 1)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{333}, kg1.Keyspaces)
re.False(kg1.InSplit)
re.True(kg1.InSplit)
re.Empty(kg1.SplitFrom)
// Check keyspace group 2.
kg2 := mustLoadKeyspaceGroupByID(re, suite.server, 2)
Expand Down