Skip to content

Commit

Permalink
dm: support monitored config (#1235)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsvisa authored Mar 29, 2021
1 parent 1225c01 commit 37487dc
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 65 deletions.
4 changes: 3 additions & 1 deletion components/dm/spec/bindversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ func DMComponentVersion(comp, version string) string {
switch comp {
case spec.ComponentAlertmanager,
spec.ComponentGrafana,
spec.ComponentPrometheus:
spec.ComponentPrometheus,
spec.ComponentBlackboxExporter,
spec.ComponentNodeExporter:
return ""
default:
return version
Expand Down
2 changes: 1 addition & 1 deletion components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (topo *Specification) GetGlobalOptions() spec.GlobalOptions {

// GetMonitoredOptions returns MonitoredOptions
func (topo *Specification) GetMonitoredOptions() *spec.MonitoredOptions {
return nil
return topo.MonitoredOptions
}

// ComponentsByStopOrder return component in the order need to stop.
Expand Down
186 changes: 128 additions & 58 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (

var (
globalOptionTypeName = reflect.TypeOf(GlobalOptions{}).Name()
monitorOptionTypeName = reflect.TypeOf(MonitoredOptions{}).Name()
serverConfigsTypeName = reflect.TypeOf(DMServerConfigs{}).Name()
)

Expand All @@ -59,13 +60,21 @@ func findField(v reflect.Value, fieldName string) (int, bool) {

// Skip global/monitored/job options
func isSkipField(field reflect.Value) bool {
if field.Kind() == reflect.Ptr {
if field.IsZero() {
return true
}
field = field.Elem()
}
tp := field.Type().Name()
return tp == globalOptionTypeName || tp == serverConfigsTypeName
return tp == globalOptionTypeName || tp == monitorOptionTypeName || tp == serverConfigsTypeName
}

type (
// GlobalOptions of spec.
GlobalOptions = spec.GlobalOptions
// MonitoredOptions is the spec of Monitored
MonitoredOptions = spec.MonitoredOptions
// PrometheusSpec is the spec of Prometheus
PrometheusSpec = spec.PrometheusSpec
// GrafanaSpec is the spec of Grafana
Expand All @@ -85,14 +94,14 @@ type (

// Specification represents the specification of topology.yaml
Specification struct {
GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"`
// MonitoredOptions MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"`
ServerConfigs DMServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"`
Masters []*MasterSpec `yaml:"master_servers"`
Workers []*WorkerSpec `yaml:"worker_servers"`
Monitors []*spec.PrometheusSpec `yaml:"monitoring_servers"`
Grafanas []*spec.GrafanaSpec `yaml:"grafana_servers,omitempty"`
Alertmanagers []*spec.AlertmanagerSpec `yaml:"alertmanager_servers,omitempty"`
GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"`
MonitoredOptions *MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"`
ServerConfigs DMServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"`
Masters []*MasterSpec `yaml:"master_servers"`
Workers []*WorkerSpec `yaml:"worker_servers"`
Monitors []*spec.PrometheusSpec `yaml:"monitoring_servers"`
Grafanas []*spec.GrafanaSpec `yaml:"grafana_servers,omitempty"`
Alertmanagers []*spec.AlertmanagerSpec `yaml:"alertmanager_servers,omitempty"`
}
)

Expand Down Expand Up @@ -225,26 +234,45 @@ func (s *WorkerSpec) IsImported() bool {
}

// UnmarshalYAML sets default values when unmarshaling the topology file
func (topo *Specification) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (s *Specification) UnmarshalYAML(unmarshal func(interface{}) error) error {
type topology Specification
if err := unmarshal((*topology)(topo)); err != nil {
if err := unmarshal((*topology)(s)); err != nil {
return err
}

if err := defaults.Set(topo); err != nil {
if err := defaults.Set(s); err != nil {
return errors.Trace(err)
}

if err := fillDMCustomDefaults(&topo.GlobalOptions, topo); err != nil {
if s.MonitoredOptions != nil {
// Set monitored options
if s.MonitoredOptions.DeployDir == "" {
s.MonitoredOptions.DeployDir = filepath.Join(s.GlobalOptions.DeployDir,
fmt.Sprintf("%s-%d", spec.RoleMonitor, s.MonitoredOptions.NodeExporterPort))
}
if s.MonitoredOptions.DataDir == "" {
s.MonitoredOptions.DataDir = filepath.Join(s.GlobalOptions.DataDir,
fmt.Sprintf("%s-%d", spec.RoleMonitor, s.MonitoredOptions.NodeExporterPort))
}
if s.MonitoredOptions.LogDir == "" {
s.MonitoredOptions.LogDir = "log"
}
if !strings.HasPrefix(s.MonitoredOptions.LogDir, "/") &&
!strings.HasPrefix(s.MonitoredOptions.LogDir, s.MonitoredOptions.DeployDir) {
s.MonitoredOptions.LogDir = filepath.Join(s.MonitoredOptions.DeployDir, s.MonitoredOptions.LogDir)
}
}

if err := fillDMCustomDefaults(&s.GlobalOptions, s); err != nil {
return err
}

return topo.Validate()
return s.Validate()
}

// platformConflictsDetect checks for conflicts in topology for different OS / Arch
// for set to the same host / IP
func (topo *Specification) platformConflictsDetect() error {
func (s *Specification) platformConflictsDetect() error {
type (
conflict struct {
os string
Expand All @@ -254,8 +282,8 @@ func (topo *Specification) platformConflictsDetect() error {
)

platformStats := map[string]conflict{}
topoSpec := reflect.ValueOf(topo).Elem()
topoType := reflect.TypeOf(topo).Elem()
topoSpec := reflect.ValueOf(s).Elem()
topoType := reflect.TypeOf(s).Elem()

for i := 0; i < topoSpec.NumField(); i++ {
if isSkipField(topoSpec.Field(i)) {
Expand Down Expand Up @@ -305,7 +333,7 @@ func (topo *Specification) platformConflictsDetect() error {
return nil
}

func (topo *Specification) portConflictsDetect() error {
func (s *Specification) portConflictsDetect() error {
type (
usedPort struct {
host string
Expand All @@ -330,8 +358,8 @@ func (topo *Specification) portConflictsDetect() error {

portStats := map[usedPort]conflict{}
uniqueHosts := set.NewStringSet()
topoSpec := reflect.ValueOf(topo).Elem()
topoType := reflect.TypeOf(topo).Elem()
topoSpec := reflect.ValueOf(s).Elem()
topoType := reflect.TypeOf(s).Elem()

for i := 0; i < topoSpec.NumField(); i++ {
if isSkipField(topoSpec.Field(i)) {
Expand Down Expand Up @@ -380,10 +408,51 @@ func (topo *Specification) portConflictsDetect() error {
}
}

// Port conflicts in monitored components
monitoredPortTypes := []string{
"NodeExporterPort",
"BlackboxExporterPort",
}
monitoredOpt := topoSpec.FieldByName(monitorOptionTypeName)
if monitoredOpt.IsZero() {
return nil
}
monitoredOpt = monitoredOpt.Elem()
for host := range uniqueHosts {
cfg := "monitored"
for _, portType := range monitoredPortTypes {
f := monitoredOpt.FieldByName(portType)
item := usedPort{
host: host,
port: int(f.Int()),
}
ft, found := monitoredOpt.Type().FieldByName(portType)
if !found {
return errors.Errorf("incompatible change `%s.%s`", monitorOptionTypeName, portType)
}
// `yaml:"node_exporter_port,omitempty"`
tp := strings.Split(ft.Tag.Get("yaml"), ",")[0]
prev, exist := portStats[item]
if exist {
return &meta.ValidateErr{
Type: meta.TypeConflict,
Target: "port",
LHS: fmt.Sprintf("%s:%s.%s", prev.cfg, item.host, prev.tp),
RHS: fmt.Sprintf("%s:%s.%s", cfg, item.host, tp),
Value: item.port,
}
}
portStats[item] = conflict{
tp: tp,
cfg: cfg,
}
}
}

return nil
}

func (topo *Specification) dirConflictsDetect() error {
func (s *Specification) dirConflictsDetect() error {
type (
usedDir struct {
host string
Expand All @@ -406,8 +475,8 @@ func (topo *Specification) dirConflictsDetect() error {
uniqueHosts = set.NewStringSet()
)

topoSpec := reflect.ValueOf(topo).Elem()
topoType := reflect.TypeOf(topo).Elem()
topoSpec := reflect.ValueOf(s).Elem()
topoType := reflect.TypeOf(s).Elem()

for i := 0; i < topoSpec.NumField(); i++ {
if isSkipField(topoSpec.Field(i)) {
Expand Down Expand Up @@ -467,7 +536,7 @@ func (topo *Specification) dirConflictsDetect() error {

// CountDir counts for dir paths used by any instance in the cluster with the same
// prefix, useful to find potential path conflicts
func (topo *Specification) CountDir(targetHost, dirPrefix string) int {
func (s *Specification) CountDir(targetHost, dirPrefix string) int {
dirTypes := []string{
"DataDir",
"DeployDir",
Expand All @@ -477,8 +546,8 @@ func (topo *Specification) CountDir(targetHost, dirPrefix string) int {
// host-path -> count
dirStats := make(map[string]int)
count := 0
topoSpec := reflect.ValueOf(topo).Elem()
dirPrefix = spec.Abs(topo.GlobalOptions.User, dirPrefix)
topoSpec := reflect.ValueOf(s).Elem()
dirPrefix = spec.Abs(s.GlobalOptions.User, dirPrefix)

for i := 0; i < topoSpec.NumField(); i++ {
if isSkipField(topoSpec.Field(i)) {
Expand Down Expand Up @@ -515,7 +584,7 @@ func (topo *Specification) CountDir(targetHost, dirPrefix string) int {
dir = filepath.Join(deployDir, dir)
}
}
dir = spec.Abs(topo.GlobalOptions.User, dir)
dir = spec.Abs(s.GlobalOptions.User, dir)
dirStats[host+dir]++
}
}
Expand All @@ -532,89 +601,90 @@ func (topo *Specification) CountDir(targetHost, dirPrefix string) int {
}

// TLSConfig generates a tls.Config for the specification as needed
func (topo *Specification) TLSConfig(dir string) (*tls.Config, error) {
if !topo.GlobalOptions.TLSEnabled {
func (s *Specification) TLSConfig(dir string) (*tls.Config, error) {
if !s.GlobalOptions.TLSEnabled {
return nil, nil
}
return spec.LoadClientCert(dir)
}

// Validate validates the topology specification and produce error if the
// specification invalid (e.g: port conflicts or directory conflicts)
func (topo *Specification) Validate() error {
if err := topo.platformConflictsDetect(); err != nil {
func (s *Specification) Validate() error {
if err := s.platformConflictsDetect(); err != nil {
return err
}

if err := topo.portConflictsDetect(); err != nil {
if err := s.portConflictsDetect(); err != nil {
return err
}

if err := topo.dirConflictsDetect(); err != nil {
if err := s.dirConflictsDetect(); err != nil {
return err
}

return spec.RelativePathDetect(topo, isSkipField)
return spec.RelativePathDetect(s, isSkipField)
}

// Type implements Topology interface.
func (topo *Specification) Type() string {
func (s *Specification) Type() string {
return spec.TopoTypeDM
}

// BaseTopo implements Topology interface.
func (topo *Specification) BaseTopo() *spec.BaseTopo {
func (s *Specification) BaseTopo() *spec.BaseTopo {
return &spec.BaseTopo{
GlobalOptions: &topo.GlobalOptions,
MonitoredOptions: topo.GetMonitoredOptions(),
MasterList: topo.GetMasterList(),
Monitors: topo.Monitors,
Grafanas: topo.Grafanas,
Alertmanagers: topo.Alertmanagers,
GlobalOptions: &s.GlobalOptions,
MonitoredOptions: s.GetMonitoredOptions(),
MasterList: s.GetMasterList(),
Monitors: s.Monitors,
Grafanas: s.Grafanas,
Alertmanagers: s.Alertmanagers,
}
}

// NewPart implements ScaleOutTopology interface.
func (topo *Specification) NewPart() spec.Topology {
func (s *Specification) NewPart() spec.Topology {
return &Specification{
GlobalOptions: topo.GlobalOptions,
ServerConfigs: topo.ServerConfigs,
GlobalOptions: s.GlobalOptions,
MonitoredOptions: s.MonitoredOptions,
ServerConfigs: s.ServerConfigs,
}
}

// MergeTopo implements ScaleOutTopology interface.
func (topo *Specification) MergeTopo(rhs spec.Topology) spec.Topology {
func (s *Specification) MergeTopo(rhs spec.Topology) spec.Topology {
other, ok := rhs.(*Specification)
if !ok {
panic("topo should be DM Topology")
}

return topo.Merge(other)
return s.Merge(other)
}

// GetMasterList returns a list of Master API hosts of the current cluster
func (topo *Specification) GetMasterList() []string {
func (s *Specification) GetMasterList() []string {
var masterList []string

for _, master := range topo.Masters {
for _, master := range s.Masters {
masterList = append(masterList, fmt.Sprintf("%s:%d", master.Host, master.Port))
}

return masterList
}

// Merge returns a new Topology which sum old ones
func (topo *Specification) Merge(that spec.Topology) spec.Topology {
func (s *Specification) Merge(that spec.Topology) spec.Topology {
spec := that.(*Specification)
return &Specification{
GlobalOptions: topo.GlobalOptions,
// MonitoredOptions: topo.MonitoredOptions,
ServerConfigs: topo.ServerConfigs,
Masters: append(topo.Masters, spec.Masters...),
Workers: append(topo.Workers, spec.Workers...),
Monitors: append(topo.Monitors, spec.Monitors...),
Grafanas: append(topo.Grafanas, spec.Grafanas...),
Alertmanagers: append(topo.Alertmanagers, spec.Alertmanagers...),
GlobalOptions: s.GlobalOptions,
MonitoredOptions: s.MonitoredOptions,
ServerConfigs: s.ServerConfigs,
Masters: append(s.Masters, spec.Masters...),
Workers: append(s.Workers, spec.Workers...),
Monitors: append(s.Monitors, spec.Monitors...),
Grafanas: append(s.Grafanas, spec.Grafanas...),
Alertmanagers: append(s.Alertmanagers, spec.Alertmanagers...),
}
}

Expand Down
18 changes: 18 additions & 0 deletions components/dm/spec/topology_dm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,21 @@ worker_servers:
assert.Equal(t, "/my-global-deploy/dm-worker-8262/data", topo.Workers[4].DataDir)
})
}

func TestMonitorLogDir(t *testing.T) {
withTempFile(`
monitored:
node_exporter_port: 39100
blackbox_exporter_port: 39115
deploy_dir: "test-deploy"
log_dir: "test-deploy/log"
`, func(file string) {
topo := Specification{}
err := spec.ParseTopologyYaml(file, &topo)
assert.Nil(t, err)
assert.Equal(t, 39100, topo.MonitoredOptions.NodeExporterPort)
assert.Equal(t, 39115, topo.MonitoredOptions.BlackboxExporterPort)
assert.Equal(t, "test-deploy/log", topo.MonitoredOptions.LogDir)
assert.Equal(t, "test-deploy", topo.MonitoredOptions.DeployDir)
})
}
Loading

0 comments on commit 37487dc

Please sign in to comment.