Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: set tiflash placement group index to 120 #37179

Merged
merged 6 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ func (d *ddl) PollTiFlashRoutine() {
if err != nil {
logutil.BgLogger().Fatal("TiFlashManagement init failed", zap.Error(err))
}

hasSetTiFlashGroup := false
nextSetTiFlashGroupTime := time.Now()
for {
select {
case <-d.ctx.Done():
Expand All @@ -586,6 +589,18 @@ func (d *ddl) PollTiFlashRoutine() {
failpoint.Inject("BeforePollTiFlashReplicaStatusLoop", func() {
failpoint.Continue()
})

if !hasSetTiFlashGroup && !time.Now().Before(nextSetTiFlashGroupTime) {
// We should set tiflash rule group a higher index than other placement groups to forbid override by them.
// Once `SetTiFlashGroupConfig` succeed, we do not need to invoke it again. If failed, we should retry it util success.
if err = infosync.SetTiFlashGroupConfig(d.ctx); err != nil {
logutil.BgLogger().Warn("SetTiFlashGroupConfig failed", zap.Error(err))
nextSetTiFlashGroupTime = time.Now().Add(time.Minute)
} else {
hasSetTiFlashGroup = true
}
}

sctx, err := d.sessPool.get()
if err == nil {
if d.ownerManager.IsOwner() {
Expand Down
16 changes: 16 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,3 +995,19 @@ func TestTiFlashBatchUnsupported(t *testing.T) {
require.Equal(t, "In total 2 tables: 1 succeed, 0 failed, 1 skipped", tk.Session().GetSessionVars().StmtCtx.GetMessage())
tk.MustGetErrCode("alter database information_schema set tiflash replica 1", 8200)
}

func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
_ = testkit.NewTestKit(t, s.store)
timeout := time.Now().Add(10 * time.Second)
errMsg := "time out"
for time.Now().Before(timeout) {
time.Sleep(100 * time.Millisecond)
if s.tiflash.GroupIndex != 0 {
errMsg = "invalid group index"
break
}
}
require.Equal(t, uint64(120), s.tiflash.GroupIndex, errMsg)
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 2 additions & 0 deletions ddl/placement/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
)

const (
// TiFlashRuleGroupID is the rule group id of tiflash
TiFlashRuleGroupID = "tiflash"
// BundleIDPrefix is the bundle prefix of all rule bundles from TiDB_DDL statements.
BundleIDPrefix = "TiDB_DDL_"
// PDBundleID is the bundle name of pd, the default bundle for all regions.
Expand Down
7 changes: 7 additions & 0 deletions ddl/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
Learner PeerRoleType = "learner"
)

// RuleGroupConfig defines basic config of rule group
type RuleGroupConfig struct {
ID string `json:"id"`
Index int `json:"index"`
Override bool `json:"override"`
}

// Rule is the core placement rule struct. Check https://github.com/tikv/pd/blob/master/server/schedule/placement/rule.go.
type Rule struct {
GroupID string `json:"group_id"`
Expand Down
10 changes: 10 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,16 @@ func GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rul
return is.labelRuleManager.GetLabelRules(ctx, ruleIDs)
}

// SetTiFlashGroupConfig is a helper function to set tiflash rule group config
func SetTiFlashGroupConfig(ctx context.Context) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return errors.Trace(err)
}
logutil.BgLogger().Info("SetTiFlashGroupConfig")
return is.tiflashPlacementManager.SetTiFlashGroupConfig(ctx)
}

// SetTiFlashPlacementRule is a helper function to set placement rule.
// It is discouraged to use SetTiFlashPlacementRule directly,
// use `ConfigureTiFlashPDForTable`/`ConfigureTiFlashPDForPartitions` instead.
Expand Down
71 changes: 70 additions & 1 deletion domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (

// TiFlashPlacementManager manages placement settings for TiFlash.
type TiFlashPlacementManager interface {
// SetTiFlashGroupConfig sets the group index of the tiflash placement rule
SetTiFlashGroupConfig(ctx context.Context) error
// SetPlacementRule is a helper function to set placement rule.
SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error
// DeletePlacementRule is to delete placement rule for certain group.
Expand All @@ -69,8 +71,62 @@ func (m *TiFlashPDPlacementManager) Close(ctx context.Context) {

}

// SetTiFlashGroupConfig sets the tiflash's rule group config
func (m *TiFlashPDPlacementManager) SetTiFlashGroupConfig(ctx context.Context) error {
xhebox marked this conversation as resolved.
Show resolved Hide resolved
res, err := doRequest(ctx,
"GetRuleGroupConfig",
m.etcdCli.Endpoints(),
path.Join(pdapi.Config, "rule_group", placement.TiFlashRuleGroupID),
"GET",
nil,
)

if err != nil {
return errors.Trace(err)
}

var groupConfig placement.RuleGroupConfig
shouldUpdate := res == nil
if res != nil {
if err = json.Unmarshal(res, &groupConfig); err != nil {
return errors.Trace(err)
}

if groupConfig.Index != placement.RuleIndexTiFlash {
shouldUpdate = true
}
}

if shouldUpdate {
groupConfig.ID = placement.TiFlashRuleGroupID
groupConfig.Index = placement.RuleIndexTiFlash

body, err := json.Marshal(&groupConfig)
if err != nil {
return errors.Trace(err)
}

_, err = doRequest(ctx,
"SetRuleGroupConfig",
m.etcdCli.Endpoints(),
path.Join(pdapi.Config, "rule_group"),
"POST",
bytes.NewBuffer(body),
)

if err != nil {
return errors.Trace(err)
}
}
return nil
}

// SetPlacementRule is a helper function to set placement rule.
func (m *TiFlashPDPlacementManager) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error {
if err := m.SetTiFlashGroupConfig(ctx); err != nil {
return err
}

if rule.Count == 0 {
return m.DeletePlacementRule(ctx, rule.GroupID, rule.ID)
}
Expand Down Expand Up @@ -195,7 +251,7 @@ type mockTiFlashPlacementManager struct {

func makeBaseRule() placement.TiFlashRule {
return placement.TiFlashRule{
GroupID: "tiflash",
GroupID: placement.TiFlashRuleGroupID,
ID: "",
Index: placement.RuleIndexTiFlash,
Override: false,
Expand Down Expand Up @@ -248,6 +304,7 @@ func (m *mockTiFlashTableInfo) String() string {
// MockTiFlash mocks a TiFlash, with necessary Pd support.
type MockTiFlash struct {
sync.Mutex
GroupIndex uint64
StatusAddr string
StatusServer *httptest.Server
SyncStatus map[int]mockTiFlashTableInfo
Expand Down Expand Up @@ -315,6 +372,7 @@ func NewMockTiFlash() *MockTiFlash {
func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) error {
tiflash.Lock()
defer tiflash.Unlock()
tiflash.GroupIndex = placement.RuleIndexTiFlash
if !tiflash.PdEnabled {
logutil.BgLogger().Info("pd server is manually disabled, just quit")
return nil
Expand Down Expand Up @@ -532,6 +590,17 @@ func (m *mockTiFlashPlacementManager) SetMockTiFlash(tiflash *MockTiFlash) {
m.tiflash = tiflash
}

// SetTiFlashGroupConfig sets the tiflash's rule group config
func (m *mockTiFlashPlacementManager) SetTiFlashGroupConfig(_ context.Context) error {
m.Lock()
defer m.Unlock()
if m.tiflash == nil {
return nil
}
m.tiflash.GroupIndex = placement.RuleIndexTiFlash
return nil
}

// SetPlacementRule is a helper function to set placement rule.
func (m *mockTiFlashPlacementManager) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error {
m.Lock()
Expand Down