diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index 0e9b240ebd..b8505fdc5a 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -741,17 +741,7 @@ func getPDConfigMap(tc *v1alpha1.TidbCluster) (*corev1.ConfigMap, error) { config.Dashboard.TiDBCertPath = path.Join(tidbClientCertPath, corev1.TLSCertKey) config.Dashboard.TiDBKeyPath = path.Join(tidbClientCertPath, corev1.TLSPrivateKeyKey) } - // TiFlash requires PD to enable the `replication.enable-placement-rules` - // Check detail in https://pingcap.com/docs/stable/reference/tiflash/deploy/ - if tc.Spec.TiFlash != nil { - if config.Replication == nil { - config.Replication = &v1alpha1.PDReplicationConfig{} - } - if config.Replication.EnablePlacementRules == nil { - enable := true - config.Replication.EnablePlacementRules = &enable - } - } + confText, err := MarshalTOML(config) if err != nil { return nil, err diff --git a/pkg/manager/member/tiflash_member_manager.go b/pkg/manager/member/tiflash_member_manager.go index 9079038793..533b8ba052 100644 --- a/pkg/manager/member/tiflash_member_manager.go +++ b/pkg/manager/member/tiflash_member_manager.go @@ -107,14 +107,36 @@ func (tfmm *tiflashMemberManager) Sync(tc *v1alpha1.TidbCluster) error { return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for PD cluster running", ns, tcName) } + err := tfmm.enablePlacementRules(tc) + if err != nil { + klog.Errorf("Enable placement rules failed, error: %v", err) + // No need to return err here, just continue to sync tiflash + } // Sync TiFlash Headless Service - if err := tfmm.syncHeadlessService(tc); err != nil { + if err = tfmm.syncHeadlessService(tc); err != nil { return err } return tfmm.syncStatefulSet(tc) } +func (tfmm *tiflashMemberManager) enablePlacementRules(tc *v1alpha1.TidbCluster) error { + pdCli := controller.GetPDClient(tfmm.pdControl, tc) + config, err := pdCli.GetConfig() + if err != nil { + return err + } + if config.Replication.EnablePlacementRules != nil && (!*config.Replication.EnablePlacementRules) { + klog.Infof("Cluster %s/%s enable-placement-rules is %v, set it to true", tc.Namespace, tc.Name, *config.Replication.EnablePlacementRules) + enable := true + rep := pdapi.PDReplicationConfig{ + EnablePlacementRules: &enable, + } + return pdCli.UpdateReplicationConfig(rep) + } + return nil +} + func (tfmm *tiflashMemberManager) syncHeadlessService(tc *v1alpha1.TidbCluster) error { if tc.Spec.Paused { klog.V(4).Infof("tiflash cluster %s/%s is paused, skip syncing for tiflash service", tc.GetNamespace(), tc.GetName()) diff --git a/pkg/pdapi/pdapi.go b/pkg/pdapi/pdapi.go index 7e174b9cda..fe01c6c2c9 100644 --- a/pkg/pdapi/pdapi.go +++ b/pkg/pdapi/pdapi.go @@ -154,6 +154,8 @@ type PDClient interface { // storeLabelsEqualNodeLabels compares store labels with node labels // for historic reasons, PD stores TiKV labels as []*StoreLabel which is a key-value pair slice SetStoreLabels(storeID uint64, labels map[string]string) (bool, error) + // UpdateReplicationConfig updates the replication config + UpdateReplicationConfig(config PDReplicationConfig) error // DeleteStore deletes a TiKV store from cluster DeleteStore(storeID uint64) error // SetStoreState sets store to specified state. @@ -185,6 +187,7 @@ var ( schedulersPrefix = "pd/api/v1/schedulers" pdLeaderPrefix = "pd/api/v1/leader" pdLeaderTransferPrefix = "pd/api/v1/leader/transfer" + pdReplicationPrefix = "pd/api/v1/config/replicate" ) // pdClient is default implementation of PDClient @@ -512,6 +515,24 @@ func (pc *pdClient) SetStoreLabels(storeID uint64, labels map[string]string) (bo return false, fmt.Errorf("failed %v to set store labels: %v", res.StatusCode, err2) } +func (pc *pdClient) UpdateReplicationConfig(config PDReplicationConfig) error { + apiURL := fmt.Sprintf("%s/%s", pc.url, pdReplicationPrefix) + data, err := json.Marshal(config) + if err != nil { + return err + } + res, err := pc.httpClient.Post(apiURL, "application/json", bytes.NewBuffer(data)) + if err != nil { + return err + } + defer httputil.DeferClose(res.Body) + if res.StatusCode == http.StatusOK { + return nil + } + err = httputil.ReadErrorBody(res.Body) + return fmt.Errorf("failed %v to update replication: %v", res.StatusCode, err) +} + func (pc *pdClient) BeginEvictLeader(storeID uint64) error { leaderEvictInfo := getLeaderEvictSchedulerInfo(storeID) apiURL := fmt.Sprintf("%s/%s", pc.url, schedulersPrefix) @@ -697,6 +718,7 @@ const ( DeleteMemberByIDActionType ActionType = "DeleteMemberByID" DeleteMemberActionType ActionType = "DeleteMember " SetStoreLabelsActionType ActionType = "SetStoreLabels" + UpdateReplicationActionType ActionType = "UpdateReplicationConfig" BeginEvictLeaderActionType ActionType = "BeginEvictLeader" EndEvictLeaderActionType ActionType = "EndEvictLeader" GetEvictLeaderSchedulersActionType ActionType = "GetEvictLeaderSchedulers" @@ -713,9 +735,10 @@ func (nfr *NotFoundReaction) Error() string { } type Action struct { - ID uint64 - Name string - Labels map[string]string + ID uint64 + Name string + Labels map[string]string + Replication PDReplicationConfig } type Reaction func(action *Action) (interface{}, error) @@ -855,6 +878,16 @@ func (pc *FakePDClient) SetStoreLabels(storeID uint64, labels map[string]string) return true, nil } +// UpdateReplicationConfig updates the replication config +func (pc *FakePDClient) UpdateReplicationConfig(config PDReplicationConfig) error { + if reaction, ok := pc.reactions[UpdateReplicationActionType]; ok { + action := &Action{Replication: config} + _, err := reaction(action) + return err + } + return nil +} + func (pc *FakePDClient) BeginEvictLeader(storeID uint64) error { if reaction, ok := pc.reactions[BeginEvictLeaderActionType]; ok { action := &Action{ID: storeID}