Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable placement rules automatically (#2328) #2340

Merged
merged 2 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,17 +741,7 @@ func getPDConfigMap(tc *v1alpha1.TidbCluster) (*corev1.ConfigMap, error) {
config.Dashboard.TiDBCertPath = path.Join(tidbClientCertPath, corev1.TLSCertKey)
config.Dashboard.TiDBKeyPath = path.Join(tidbClientCertPath, corev1.TLSPrivateKeyKey)
}
// TiFlash requires PD to enable the `replication.enable-placement-rules`
// Check detail in https://pingcap.com/docs/stable/reference/tiflash/deploy/
if tc.Spec.TiFlash != nil {
if config.Replication == nil {
config.Replication = &v1alpha1.PDReplicationConfig{}
}
if config.Replication.EnablePlacementRules == nil {
enable := true
config.Replication.EnablePlacementRules = &enable
}
}

confText, err := MarshalTOML(config)
if err != nil {
return nil, err
Expand Down
24 changes: 23 additions & 1 deletion pkg/manager/member/tiflash_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,36 @@ func (tfmm *tiflashMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for PD cluster running", ns, tcName)
}

err := tfmm.enablePlacementRules(tc)
if err != nil {
klog.Errorf("Enable placement rules failed, error: %v", err)
// No need to return err here, just continue to sync tiflash
}
// Sync TiFlash Headless Service
if err := tfmm.syncHeadlessService(tc); err != nil {
if err = tfmm.syncHeadlessService(tc); err != nil {
return err
}

return tfmm.syncStatefulSet(tc)
}

func (tfmm *tiflashMemberManager) enablePlacementRules(tc *v1alpha1.TidbCluster) error {
pdCli := controller.GetPDClient(tfmm.pdControl, tc)
config, err := pdCli.GetConfig()
if err != nil {
return err
}
if config.Replication.EnablePlacementRules != nil && (!*config.Replication.EnablePlacementRules) {
klog.Infof("Cluster %s/%s enable-placement-rules is %v, set it to true", tc.Namespace, tc.Name, *config.Replication.EnablePlacementRules)
enable := true
rep := pdapi.PDReplicationConfig{
EnablePlacementRules: &enable,
}
return pdCli.UpdateReplicationConfig(rep)
}
return nil
}

func (tfmm *tiflashMemberManager) syncHeadlessService(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tiflash cluster %s/%s is paused, skip syncing for tiflash service", tc.GetNamespace(), tc.GetName())
Expand Down
39 changes: 36 additions & 3 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type PDClient interface {
// storeLabelsEqualNodeLabels compares store labels with node labels
// for historic reasons, PD stores TiKV labels as []*StoreLabel which is a key-value pair slice
SetStoreLabels(storeID uint64, labels map[string]string) (bool, error)
// UpdateReplicationConfig updates the replication config
UpdateReplicationConfig(config PDReplicationConfig) error
// DeleteStore deletes a TiKV store from cluster
DeleteStore(storeID uint64) error
// SetStoreState sets store to specified state.
Expand Down Expand Up @@ -185,6 +187,7 @@ var (
schedulersPrefix = "pd/api/v1/schedulers"
pdLeaderPrefix = "pd/api/v1/leader"
pdLeaderTransferPrefix = "pd/api/v1/leader/transfer"
pdReplicationPrefix = "pd/api/v1/config/replicate"
)

// pdClient is default implementation of PDClient
Expand Down Expand Up @@ -512,6 +515,24 @@ func (pc *pdClient) SetStoreLabels(storeID uint64, labels map[string]string) (bo
return false, fmt.Errorf("failed %v to set store labels: %v", res.StatusCode, err2)
}

func (pc *pdClient) UpdateReplicationConfig(config PDReplicationConfig) error {
apiURL := fmt.Sprintf("%s/%s", pc.url, pdReplicationPrefix)
data, err := json.Marshal(config)
if err != nil {
return err
}
res, err := pc.httpClient.Post(apiURL, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusOK {
return nil
}
err = httputil.ReadErrorBody(res.Body)
return fmt.Errorf("failed %v to update replication: %v", res.StatusCode, err)
}

func (pc *pdClient) BeginEvictLeader(storeID uint64) error {
leaderEvictInfo := getLeaderEvictSchedulerInfo(storeID)
apiURL := fmt.Sprintf("%s/%s", pc.url, schedulersPrefix)
Expand Down Expand Up @@ -697,6 +718,7 @@ const (
DeleteMemberByIDActionType ActionType = "DeleteMemberByID"
DeleteMemberActionType ActionType = "DeleteMember "
SetStoreLabelsActionType ActionType = "SetStoreLabels"
UpdateReplicationActionType ActionType = "UpdateReplicationConfig"
BeginEvictLeaderActionType ActionType = "BeginEvictLeader"
EndEvictLeaderActionType ActionType = "EndEvictLeader"
GetEvictLeaderSchedulersActionType ActionType = "GetEvictLeaderSchedulers"
Expand All @@ -713,9 +735,10 @@ func (nfr *NotFoundReaction) Error() string {
}

type Action struct {
ID uint64
Name string
Labels map[string]string
ID uint64
Name string
Labels map[string]string
Replication PDReplicationConfig
}

type Reaction func(action *Action) (interface{}, error)
Expand Down Expand Up @@ -855,6 +878,16 @@ func (pc *FakePDClient) SetStoreLabels(storeID uint64, labels map[string]string)
return true, nil
}

// UpdateReplicationConfig updates the replication config
func (pc *FakePDClient) UpdateReplicationConfig(config PDReplicationConfig) error {
if reaction, ok := pc.reactions[UpdateReplicationActionType]; ok {
action := &Action{Replication: config}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) BeginEvictLeader(storeID uint64) error {
if reaction, ok := pc.reactions[BeginEvictLeaderActionType]; ok {
action := &Action{ID: storeID}
Expand Down