Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: optimize tispark supporting #621

Merged
merged 3 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cluster/embed/autogen_pkger.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
RoleTiSparkWorker = "tispark-worker"
ErrNoTiSparkMaster = errors.New("there must be a Spark master node if you want to use the TiSpark component")
ErrMultipleTiSparkMaster = errors.New("a TiSpark enabled cluster with more than 1 Spark master node is not supported")
ErrMultipleTisparkWorker = errors.New("multiple TiSpark workers on the same host is not supported by Spark")
)

type (
Expand Down Expand Up @@ -515,6 +516,17 @@ func (s *Specification) validateTiSparkSpec() error {
return ErrMultipleTiSparkMaster
}

// Multiple workers on the same host is not supported by Spark
if len(s.TiSparkWorkers) > 1 {
cnt := make(map[string]int)
for _, w := range s.TiSparkWorkers {
if cnt[w.Host] > 0 {
return errors.Annotatef(ErrMultipleTisparkWorker, "the host %s is duplicated", w.Host)
}
cnt[w.Host]++
}
}

return nil
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/cluster/spec/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,4 +770,23 @@ tispark_workers:
`), &topo)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "there must be a Spark master node if you want to use the TiSpark component")

err = yaml.Unmarshal([]byte(`
pd_servers:
- host: 172.16.5.138
port: 1234
tispark_masters:
- host: 172.16.5.138
port: 1236
tispark_workers:
- host: 172.16.5.138
port: 1235
- host: 172.16.5.139
port: 1235
- host: 172.16.5.139
port: 1236
web_port: 8089
`), &topo)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "the host 172.16.5.139 is duplicated: multiple TiSpark workers on the same host is not supported by Spark")
}
36 changes: 26 additions & 10 deletions pkg/cluster/spec/tispark.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
// TiSparkMasterSpec is the topology specification for TiSpark master node
type TiSparkMasterSpec struct {
Host string `yaml:"host"`
ListenHost string `yaml:"listen_host,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Imported bool `yaml:"imported,omitempty"`
Port int `yaml:"port" default:"7077"`
WebPort int `yaml:"web_port" default:"8080"`
DeployDir string `yaml:"deploy_dir,omitempty"`
JavaHome string `yaml:"java_home,omitempty" validate:"java_home:editable"`
SparkConfigs map[string]interface{} `yaml:"spark_config,omitempty" validate:"spark_config:editable"`
SparkEnvs map[string]string `yaml:"spark_env,omitempty" validate:"spark_env:editable"`
Arch string `yaml:"arch,omitempty"`
Expand Down Expand Up @@ -71,14 +73,16 @@ func (s TiSparkMasterSpec) Status(pdList ...string) string {

// TiSparkWorkerSpec is the topology specification for TiSpark slave nodes
type TiSparkWorkerSpec struct {
Host string `yaml:"host"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Imported bool `yaml:"imported,omitempty"`
Port int `yaml:"port" default:"7078"`
WebPort int `yaml:"web_port" default:"8081"`
DeployDir string `yaml:"deploy_dir,omitempty"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
Host string `yaml:"host"`
ListenHost string `yaml:"listen_host,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Imported bool `yaml:"imported,omitempty"`
Port int `yaml:"port" default:"7078"`
WebPort int `yaml:"web_port" default:"8081"`
DeployDir string `yaml:"deploy_dir,omitempty"`
JavaHome string `yaml:"java_home,omitempty" validate:"java_home:editable"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
}

// Role returns the component role of the instance
Expand Down Expand Up @@ -165,6 +169,11 @@ func (i *TiSparkMasterInstance) GetCustomEnvs() map[string]string {
return v.Interface().(map[string]string)
}

// GetJavaHome returns the java_home value in spec
func (i *TiSparkMasterInstance) GetJavaHome() string {
return reflect.ValueOf(i.InstanceSpec).FieldByName("JavaHome").String()
}

// InitConfig implement Instance interface
func (i *TiSparkMasterInstance) InitConfig(e executor.Executor, clusterName, clusterVersion, deployUser string, paths meta.DirPaths) error {
// generate systemd service to invoke spark's start/stop scripts
Expand All @@ -173,7 +182,7 @@ func (i *TiSparkMasterInstance) InitConfig(e executor.Executor, clusterName, clu
port := i.GetPort()
sysCfg := filepath.Join(paths.Cache, fmt.Sprintf("%s-%s-%d.service", comp, host, port))

systemCfg := system.NewTiSparkConfig(comp, deployUser, paths.Deploy)
systemCfg := system.NewTiSparkConfig(comp, deployUser, paths.Deploy, i.GetJavaHome())

if err := systemCfg.ConfigToFile(sysCfg); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -210,6 +219,7 @@ func (i *TiSparkMasterInstance) InitConfig(e executor.Executor, clusterName, clu
}

env := scripts.NewTiSparkEnv(host).
WithLocalIP(i.GetListenHost()).
WithMasterPorts(i.usedPorts[0], i.usedPorts[1]).
WithCustomEnv(i.GetCustomEnvs())
// transfer spark-env.sh file
Expand Down Expand Up @@ -285,6 +295,11 @@ type TiSparkWorkerInstance struct {
instance
}

// GetJavaHome returns the java_home value in spec
func (i *TiSparkWorkerInstance) GetJavaHome() string {
return reflect.ValueOf(i.InstanceSpec).FieldByName("JavaHome").String()
}

// InitConfig implement Instance interface
func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clusterVersion, deployUser string, paths meta.DirPaths) error {
// generate systemd service to invoke spark's start/stop scripts
Expand All @@ -293,7 +308,7 @@ func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clu
port := i.GetPort()
sysCfg := filepath.Join(paths.Cache, fmt.Sprintf("%s-%s-%d.service", comp, host, port))

systemCfg := system.NewTiSparkConfig(comp, deployUser, paths.Deploy)
systemCfg := system.NewTiSparkConfig(comp, deployUser, paths.Deploy, i.GetJavaHome())

if err := systemCfg.ConfigToFile(sysCfg); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -330,6 +345,7 @@ func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clu
}

env := scripts.NewTiSparkEnv(i.topo.TiSparkMasters[0].Host).
WithLocalIP(i.GetListenHost()).
WithMasterPorts(i.topo.TiSparkMasters[0].Port, i.topo.TiSparkMasters[0].WebPort).
WithWorkerPorts(i.usedPorts[0], i.usedPorts[1]).
WithCustomEnv(i.instance.topo.TiSparkMasters[0].SparkEnvs)
Expand Down
19 changes: 13 additions & 6 deletions pkg/cluster/template/scripts/tispark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (

// TiSparkEnv represent the data to generate TiSpark environment config
type TiSparkEnv struct {
TiSparkMaster string
MasterPort int
WorkerPort int
MasterUIPort int
WorkerUIPort int
CustomEnvs map[string]string
TiSparkMaster string
TiSparkLocalIP string
MasterPort int
WorkerPort int
MasterUIPort int
WorkerUIPort int
CustomEnvs map[string]string
}

// NewTiSparkEnv returns a TiSparkConfig
Expand All @@ -40,6 +41,12 @@ func (c *TiSparkEnv) WithCustomEnv(m map[string]string) *TiSparkEnv {
return c
}

// WithLocalIP sets custom setting fields
func (c *TiSparkEnv) WithLocalIP(ip string) *TiSparkEnv {
c.TiSparkLocalIP = ip
return c
}

// WithMasterPorts sets port for masters
func (c *TiSparkEnv) WithMasterPorts(port, ui int) *TiSparkEnv {
c.MasterPort = port
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/template/systemd/tispark.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ type TiSparkConfig struct {
ServiceName string
User string
DeployDir string
JavaHome string
// Takes one of no, on-success, on-failure, on-abnormal, on-watchdog, on-abort, or always.
// The Template set as always if this is not setted.
Restart string
}

// NewTiSparkConfig returns a Config with given arguments
func NewTiSparkConfig(service, user, deployDir string) *TiSparkConfig {
func NewTiSparkConfig(service, user, deployDir, javaHome string) *TiSparkConfig {
if strings.Contains(service, "master") {
service = "master"
} else if strings.Contains(service, "worker") {
Expand All @@ -44,6 +45,7 @@ func NewTiSparkConfig(service, user, deployDir string) *TiSparkConfig {
ServiceName: service,
User: user,
DeployDir: deployDir,
JavaHome: javaHome,
}
}

Expand Down
3 changes: 3 additions & 0 deletions templates/scripts/spark-env.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,6 @@ SPARK_WORKER_PORT={{.WorkerPort}}
{{- if ne .WorkerUIPort 0}}
SPARK_WORKER_WEBUI_PORT={{.WorkerUIPort}}
{{- end}}
{{- if ne .TiSparkLocalIP ""}}
SPARK_LOCAL_IP={{.TiSparkLocalIP}}
{{- end}}
5 changes: 4 additions & 1 deletion templates/systemd/tispark.service.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ After=syslog.target network.target remote-fs.target nss-lookup.target

[Service]
User={{.User}}
{{- if ne .JavaHome ""}}
Environment="JAVA_HOME={{.JavaHome}}"
{{- end}}
ExecStart={{.DeployDir}}/sbin/start-{{.ServiceName}}.sh
ExecStop={{.DeployDir}}/sbin/stop-{{.ServiceName}}.sh
Type=forking
{{- if .Restart}}
Restart={{.Restart}}
{{else}}
Restart=always
{{end}}
{{- end}}
RestartSec=15s
SendSIGKILL=no

Expand Down