diff --git a/components/dm/spec/bindversion.go b/components/dm/spec/bindversion.go index 5464c952dc..cfda8ffa05 100644 --- a/components/dm/spec/bindversion.go +++ b/components/dm/spec/bindversion.go @@ -8,7 +8,9 @@ func DMComponentVersion(comp, version string) string { switch comp { case spec.ComponentAlertmanager, spec.ComponentGrafana, - spec.ComponentPrometheus: + spec.ComponentPrometheus, + spec.ComponentBlackboxExporter, + spec.ComponentNodeExporter: return "" default: return version diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index 313c5be4bb..189154e830 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -302,7 +302,7 @@ func (topo *Specification) GetGlobalOptions() spec.GlobalOptions { // GetMonitoredOptions returns MonitoredOptions func (topo *Specification) GetMonitoredOptions() *spec.MonitoredOptions { - return nil + return topo.MonitoredOptions } // ComponentsByStopOrder return component in the order need to stop. diff --git a/components/dm/spec/topology_dm.go b/components/dm/spec/topology_dm.go index 04c99eceeb..d89fb5cd5d 100644 --- a/components/dm/spec/topology_dm.go +++ b/components/dm/spec/topology_dm.go @@ -35,6 +35,7 @@ const ( var ( globalOptionTypeName = reflect.TypeOf(GlobalOptions{}).Name() + monitorOptionTypeName = reflect.TypeOf(MonitoredOptions{}).Name() serverConfigsTypeName = reflect.TypeOf(DMServerConfigs{}).Name() ) @@ -59,13 +60,21 @@ func findField(v reflect.Value, fieldName string) (int, bool) { // Skip global/monitored/job options func isSkipField(field reflect.Value) bool { + if field.Kind() == reflect.Ptr { + if field.IsZero() { + return true + } + field = field.Elem() + } tp := field.Type().Name() - return tp == globalOptionTypeName || tp == serverConfigsTypeName + return tp == globalOptionTypeName || tp == monitorOptionTypeName || tp == serverConfigsTypeName } type ( // GlobalOptions of spec. GlobalOptions = spec.GlobalOptions + // MonitoredOptions is the spec of Monitored + MonitoredOptions = spec.MonitoredOptions // PrometheusSpec is the spec of Prometheus PrometheusSpec = spec.PrometheusSpec // GrafanaSpec is the spec of Grafana @@ -85,14 +94,14 @@ type ( // Specification represents the specification of topology.yaml Specification struct { - GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` - // MonitoredOptions MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"` - ServerConfigs DMServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` - Masters []*MasterSpec `yaml:"master_servers"` - Workers []*WorkerSpec `yaml:"worker_servers"` - Monitors []*spec.PrometheusSpec `yaml:"monitoring_servers"` - Grafanas []*spec.GrafanaSpec `yaml:"grafana_servers,omitempty"` - Alertmanagers []*spec.AlertmanagerSpec `yaml:"alertmanager_servers,omitempty"` + GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` + MonitoredOptions *MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"` + ServerConfigs DMServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` + Masters []*MasterSpec `yaml:"master_servers"` + Workers []*WorkerSpec `yaml:"worker_servers"` + Monitors []*spec.PrometheusSpec `yaml:"monitoring_servers"` + Grafanas []*spec.GrafanaSpec `yaml:"grafana_servers,omitempty"` + Alertmanagers []*spec.AlertmanagerSpec `yaml:"alertmanager_servers,omitempty"` } ) @@ -225,26 +234,45 @@ func (s *WorkerSpec) IsImported() bool { } // UnmarshalYAML sets default values when unmarshaling the topology file -func (topo *Specification) UnmarshalYAML(unmarshal func(interface{}) error) error { +func (s *Specification) UnmarshalYAML(unmarshal func(interface{}) error) error { type topology Specification - if err := unmarshal((*topology)(topo)); err != nil { + if err := unmarshal((*topology)(s)); err != nil { return err } - if err := defaults.Set(topo); err != nil { + if err := defaults.Set(s); err != nil { return errors.Trace(err) } - if err := fillDMCustomDefaults(&topo.GlobalOptions, topo); err != nil { + if s.MonitoredOptions != nil { + // Set monitored options + if s.MonitoredOptions.DeployDir == "" { + s.MonitoredOptions.DeployDir = filepath.Join(s.GlobalOptions.DeployDir, + fmt.Sprintf("%s-%d", spec.RoleMonitor, s.MonitoredOptions.NodeExporterPort)) + } + if s.MonitoredOptions.DataDir == "" { + s.MonitoredOptions.DataDir = filepath.Join(s.GlobalOptions.DataDir, + fmt.Sprintf("%s-%d", spec.RoleMonitor, s.MonitoredOptions.NodeExporterPort)) + } + if s.MonitoredOptions.LogDir == "" { + s.MonitoredOptions.LogDir = "log" + } + if !strings.HasPrefix(s.MonitoredOptions.LogDir, "/") && + !strings.HasPrefix(s.MonitoredOptions.LogDir, s.MonitoredOptions.DeployDir) { + s.MonitoredOptions.LogDir = filepath.Join(s.MonitoredOptions.DeployDir, s.MonitoredOptions.LogDir) + } + } + + if err := fillDMCustomDefaults(&s.GlobalOptions, s); err != nil { return err } - return topo.Validate() + return s.Validate() } // platformConflictsDetect checks for conflicts in topology for different OS / Arch // for set to the same host / IP -func (topo *Specification) platformConflictsDetect() error { +func (s *Specification) platformConflictsDetect() error { type ( conflict struct { os string @@ -254,8 +282,8 @@ func (topo *Specification) platformConflictsDetect() error { ) platformStats := map[string]conflict{} - topoSpec := reflect.ValueOf(topo).Elem() - topoType := reflect.TypeOf(topo).Elem() + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() for i := 0; i < topoSpec.NumField(); i++ { if isSkipField(topoSpec.Field(i)) { @@ -305,7 +333,7 @@ func (topo *Specification) platformConflictsDetect() error { return nil } -func (topo *Specification) portConflictsDetect() error { +func (s *Specification) portConflictsDetect() error { type ( usedPort struct { host string @@ -330,8 +358,8 @@ func (topo *Specification) portConflictsDetect() error { portStats := map[usedPort]conflict{} uniqueHosts := set.NewStringSet() - topoSpec := reflect.ValueOf(topo).Elem() - topoType := reflect.TypeOf(topo).Elem() + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() for i := 0; i < topoSpec.NumField(); i++ { if isSkipField(topoSpec.Field(i)) { @@ -380,10 +408,51 @@ func (topo *Specification) portConflictsDetect() error { } } + // Port conflicts in monitored components + monitoredPortTypes := []string{ + "NodeExporterPort", + "BlackboxExporterPort", + } + monitoredOpt := topoSpec.FieldByName(monitorOptionTypeName) + if monitoredOpt.IsZero() { + return nil + } + monitoredOpt = monitoredOpt.Elem() + for host := range uniqueHosts { + cfg := "monitored" + for _, portType := range monitoredPortTypes { + f := monitoredOpt.FieldByName(portType) + item := usedPort{ + host: host, + port: int(f.Int()), + } + ft, found := monitoredOpt.Type().FieldByName(portType) + if !found { + return errors.Errorf("incompatible change `%s.%s`", monitorOptionTypeName, portType) + } + // `yaml:"node_exporter_port,omitempty"` + tp := strings.Split(ft.Tag.Get("yaml"), ",")[0] + prev, exist := portStats[item] + if exist { + return &meta.ValidateErr{ + Type: meta.TypeConflict, + Target: "port", + LHS: fmt.Sprintf("%s:%s.%s", prev.cfg, item.host, prev.tp), + RHS: fmt.Sprintf("%s:%s.%s", cfg, item.host, tp), + Value: item.port, + } + } + portStats[item] = conflict{ + tp: tp, + cfg: cfg, + } + } + } + return nil } -func (topo *Specification) dirConflictsDetect() error { +func (s *Specification) dirConflictsDetect() error { type ( usedDir struct { host string @@ -406,8 +475,8 @@ func (topo *Specification) dirConflictsDetect() error { uniqueHosts = set.NewStringSet() ) - topoSpec := reflect.ValueOf(topo).Elem() - topoType := reflect.TypeOf(topo).Elem() + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() for i := 0; i < topoSpec.NumField(); i++ { if isSkipField(topoSpec.Field(i)) { @@ -467,7 +536,7 @@ func (topo *Specification) dirConflictsDetect() error { // CountDir counts for dir paths used by any instance in the cluster with the same // prefix, useful to find potential path conflicts -func (topo *Specification) CountDir(targetHost, dirPrefix string) int { +func (s *Specification) CountDir(targetHost, dirPrefix string) int { dirTypes := []string{ "DataDir", "DeployDir", @@ -477,8 +546,8 @@ func (topo *Specification) CountDir(targetHost, dirPrefix string) int { // host-path -> count dirStats := make(map[string]int) count := 0 - topoSpec := reflect.ValueOf(topo).Elem() - dirPrefix = spec.Abs(topo.GlobalOptions.User, dirPrefix) + topoSpec := reflect.ValueOf(s).Elem() + dirPrefix = spec.Abs(s.GlobalOptions.User, dirPrefix) for i := 0; i < topoSpec.NumField(); i++ { if isSkipField(topoSpec.Field(i)) { @@ -515,7 +584,7 @@ func (topo *Specification) CountDir(targetHost, dirPrefix string) int { dir = filepath.Join(deployDir, dir) } } - dir = spec.Abs(topo.GlobalOptions.User, dir) + dir = spec.Abs(s.GlobalOptions.User, dir) dirStats[host+dir]++ } } @@ -532,8 +601,8 @@ func (topo *Specification) CountDir(targetHost, dirPrefix string) int { } // TLSConfig generates a tls.Config for the specification as needed -func (topo *Specification) TLSConfig(dir string) (*tls.Config, error) { - if !topo.GlobalOptions.TLSEnabled { +func (s *Specification) TLSConfig(dir string) (*tls.Config, error) { + if !s.GlobalOptions.TLSEnabled { return nil, nil } return spec.LoadClientCert(dir) @@ -541,62 +610,63 @@ func (topo *Specification) TLSConfig(dir string) (*tls.Config, error) { // Validate validates the topology specification and produce error if the // specification invalid (e.g: port conflicts or directory conflicts) -func (topo *Specification) Validate() error { - if err := topo.platformConflictsDetect(); err != nil { +func (s *Specification) Validate() error { + if err := s.platformConflictsDetect(); err != nil { return err } - if err := topo.portConflictsDetect(); err != nil { + if err := s.portConflictsDetect(); err != nil { return err } - if err := topo.dirConflictsDetect(); err != nil { + if err := s.dirConflictsDetect(); err != nil { return err } - return spec.RelativePathDetect(topo, isSkipField) + return spec.RelativePathDetect(s, isSkipField) } // Type implements Topology interface. -func (topo *Specification) Type() string { +func (s *Specification) Type() string { return spec.TopoTypeDM } // BaseTopo implements Topology interface. -func (topo *Specification) BaseTopo() *spec.BaseTopo { +func (s *Specification) BaseTopo() *spec.BaseTopo { return &spec.BaseTopo{ - GlobalOptions: &topo.GlobalOptions, - MonitoredOptions: topo.GetMonitoredOptions(), - MasterList: topo.GetMasterList(), - Monitors: topo.Monitors, - Grafanas: topo.Grafanas, - Alertmanagers: topo.Alertmanagers, + GlobalOptions: &s.GlobalOptions, + MonitoredOptions: s.GetMonitoredOptions(), + MasterList: s.GetMasterList(), + Monitors: s.Monitors, + Grafanas: s.Grafanas, + Alertmanagers: s.Alertmanagers, } } // NewPart implements ScaleOutTopology interface. -func (topo *Specification) NewPart() spec.Topology { +func (s *Specification) NewPart() spec.Topology { return &Specification{ - GlobalOptions: topo.GlobalOptions, - ServerConfigs: topo.ServerConfigs, + GlobalOptions: s.GlobalOptions, + MonitoredOptions: s.MonitoredOptions, + ServerConfigs: s.ServerConfigs, } } // MergeTopo implements ScaleOutTopology interface. -func (topo *Specification) MergeTopo(rhs spec.Topology) spec.Topology { +func (s *Specification) MergeTopo(rhs spec.Topology) spec.Topology { other, ok := rhs.(*Specification) if !ok { panic("topo should be DM Topology") } - return topo.Merge(other) + return s.Merge(other) } // GetMasterList returns a list of Master API hosts of the current cluster -func (topo *Specification) GetMasterList() []string { +func (s *Specification) GetMasterList() []string { var masterList []string - for _, master := range topo.Masters { + for _, master := range s.Masters { masterList = append(masterList, fmt.Sprintf("%s:%d", master.Host, master.Port)) } @@ -604,17 +674,17 @@ func (topo *Specification) GetMasterList() []string { } // Merge returns a new Topology which sum old ones -func (topo *Specification) Merge(that spec.Topology) spec.Topology { +func (s *Specification) Merge(that spec.Topology) spec.Topology { spec := that.(*Specification) return &Specification{ - GlobalOptions: topo.GlobalOptions, - // MonitoredOptions: topo.MonitoredOptions, - ServerConfigs: topo.ServerConfigs, - Masters: append(topo.Masters, spec.Masters...), - Workers: append(topo.Workers, spec.Workers...), - Monitors: append(topo.Monitors, spec.Monitors...), - Grafanas: append(topo.Grafanas, spec.Grafanas...), - Alertmanagers: append(topo.Alertmanagers, spec.Alertmanagers...), + GlobalOptions: s.GlobalOptions, + MonitoredOptions: s.MonitoredOptions, + ServerConfigs: s.ServerConfigs, + Masters: append(s.Masters, spec.Masters...), + Workers: append(s.Workers, spec.Workers...), + Monitors: append(s.Monitors, spec.Monitors...), + Grafanas: append(s.Grafanas, spec.Grafanas...), + Alertmanagers: append(s.Alertmanagers, spec.Alertmanagers...), } } diff --git a/components/dm/spec/topology_dm_test.go b/components/dm/spec/topology_dm_test.go index ea03e76078..4793039f63 100644 --- a/components/dm/spec/topology_dm_test.go +++ b/components/dm/spec/topology_dm_test.go @@ -521,3 +521,21 @@ worker_servers: assert.Equal(t, "/my-global-deploy/dm-worker-8262/data", topo.Workers[4].DataDir) }) } + +func TestMonitorLogDir(t *testing.T) { + withTempFile(` +monitored: + node_exporter_port: 39100 + blackbox_exporter_port: 39115 + deploy_dir: "test-deploy" + log_dir: "test-deploy/log" +`, func(file string) { + topo := Specification{} + err := spec.ParseTopologyYaml(file, &topo) + assert.Nil(t, err) + assert.Equal(t, 39100, topo.MonitoredOptions.NodeExporterPort) + assert.Equal(t, 39115, topo.MonitoredOptions.BlackboxExporterPort) + assert.Equal(t, "test-deploy/log", topo.MonitoredOptions.LogDir) + assert.Equal(t, "test-deploy", topo.MonitoredOptions.DeployDir) + }) +} diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index c179ff6570..3a5482228b 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -14,7 +14,7 @@ rule_files: - '{{.}}' {{- end}} {{- else}} -{{- if .MonitoredServers}} +{{- if and .MonitoredServers .PDAddrs}} - 'node.rules.yml' - 'blacker.rules.yml' - 'bypass.rules.yml' @@ -382,4 +382,4 @@ scrape_configs: {{- if .RemoteConfig}} {{.RemoteConfig}} -{{- end}} \ No newline at end of file +{{- end}} diff --git a/tests/tiup-cluster/topo/full.yaml b/tests/tiup-cluster/topo/full.yaml index 6d9164aa6d..999b483c88 100644 --- a/tests/tiup-cluster/topo/full.yaml +++ b/tests/tiup-cluster/topo/full.yaml @@ -69,3 +69,7 @@ grafana_servers: alertmanager_servers: - host: n1 config_file: /tmp/local/alertmanager/alertmanager.yml + +monitored: + node_exporter_port: 9100 + blackbox_exporter_port: 9115 diff --git a/tests/tiup-dm/script/task/run.sh b/tests/tiup-dm/script/task/run.sh index 09273cd87a..f38a0550ec 100755 --- a/tests/tiup-dm/script/task/run.sh +++ b/tests/tiup-dm/script/task/run.sh @@ -27,8 +27,8 @@ $ctl --master-addr n1:8261 start-task $wd/task.yaml sleep 10 # wait to replicate line=$(echo "select * from db_target.t_target" | mysql -h tidb1 -P 4000 | wc -l) if [ $line = 10 ];then - echo "replicate data success" + echo "replicate data success" else - echo "fail to replicate data, line is $line" - exit -1 + echo "fail to replicate data, line is $line" + exit -1 fi diff --git a/tests/tiup-dm/test_cmd.sh b/tests/tiup-dm/test_cmd.sh index bbad38ad3f..3e861e6e42 100755 --- a/tests/tiup-dm/test_cmd.sh +++ b/tests/tiup-dm/test_cmd.sh @@ -16,6 +16,8 @@ mkdir -p ~/.tiup/bin && cp -f ./root.json ~/.tiup/bin/ tiup-dm --yes deploy $name $version $topo -i ~/.ssh/id_rsa +# topology doesn't contains the section `monitored` will not deploy node_exporter, blackbox_exporter +! tiup-dm exec $name -N $ipprefix.101 --command "ls /etc/systemd/system/{node,blackbox}_exporter-*.service" tiup-dm list | grep "$name" # debug https://github.com/pingcap/tiup/issues/666 @@ -115,3 +117,14 @@ tiup-dm --yes destroy $name # after destroy the cluster, the public key should be deleted ! ssh -o "StrictHostKeyChecking=no" -o "PasswordAuthentication=no" -i "/tmp/$name.id_rsa" tidb@$ipprefix.102 "ls" unlink "/tmp/$name.id_rsa" + +topo=./topo/full_dm_monitored.yaml + +ipprefix=${TIUP_TEST_IP_PREFIX:-"172.19.0"} +sed "s/__IPPREFIX__/$ipprefix/g" $topo.tpl > $topo +tiup-dm --yes deploy $name $version $topo -i ~/.ssh/id_rsa + +# topology contains the section `monitored` will deploy node_exporter, blackbox_exporter +tiup-dm exec $name -N $ipprefix.101 --command "ls /etc/systemd/system/{node,blackbox}_exporter-*.service" + +tiup-dm --yes destroy $name diff --git a/tests/tiup-dm/topo/full_dm_monitored.yaml.tpl b/tests/tiup-dm/topo/full_dm_monitored.yaml.tpl new file mode 100644 index 0000000000..57cec4cc3e --- /dev/null +++ b/tests/tiup-dm/topo/full_dm_monitored.yaml.tpl @@ -0,0 +1,41 @@ +server_configs: + master: + rpc-timeout: "30s" # you many need update test_upgrade.sh if change this cause it's used to test edit-config + rpc-rate-limit: 10.0 + rpc-rate-burst: 40 + +master_servers: + - host: __IPPREFIX__.101 + config: + rpc-rate-limit: 10.0 + rpc-rate-burst: 40 + - host: __IPPREFIX__.101 + port: 8361 + peer_port: 8292 + - host: __IPPREFIX__.102 + - host: __IPPREFIX__.103 + data_dir: /home/tidb/my_master_data + - host: __IPPREFIX__.104 + +worker_servers: + - host: __IPPREFIX__.101 + port: 8262 + - host: __IPPREFIX__.101 + port: 8263 + - host: __IPPREFIX__.102 + - host: __IPPREFIX__.103 + - host: __IPPREFIX__.104 + +monitoring_servers: + - host: __IPPREFIX__.101 + rule_dir: /tmp/local/prometheus +grafana_servers: + - host: __IPPREFIX__.101 + dashboard_dir: /tmp/local/grafana +alertmanager_servers: + - host: __IPPREFIX__.101 + config_file: /tmp/local/alertmanager/alertmanager.yml + +monitored: + node_exporter_port: 39100 + blackbox_exporter_port: 39115