Skip to content

Commit

Permalink
Merge branch 'master' into issue-1757
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Feb 21, 2022
2 parents e261e25 + eb01ede commit c96efa7
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 83 deletions.
2 changes: 2 additions & 0 deletions components/bench/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func registerTpcc(root *cobra.Command) {
}

cmd.PersistentFlags().IntVar(&tpccConfig.Parts, "parts", 1, "Number to partition warehouses")
cmd.PersistentFlags().IntVar(&tpccConfig.PartitionType, "partition-type", 1, "Partition type (1 - HASH, 2 - RANGE, 3 - LIST (like HASH), 4 - LIST (like RANGE)")
cmd.PersistentFlags().IntVar(&tpccConfig.Warehouses, "warehouses", 10, "Number of warehouses")
cmd.PersistentFlags().BoolVar(&tpccConfig.CheckAll, "check-all", false, "Run all consistency checks")
var cmdPrepare = &cobra.Command{
Expand Down Expand Up @@ -96,6 +97,7 @@ func registerTpcc(root *cobra.Command) {
}
cmdRun.PersistentFlags().BoolVar(&tpccConfig.Wait, "wait", false, "including keying & thinking time described on TPC-C Standard Specification")
cmdRun.PersistentFlags().DurationVar(&tpccConfig.MaxMeasureLatency, "max-measure-latency", measurement.DefaultMaxLatency, "max measure latency in millisecond")
cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Expand Down
8 changes: 8 additions & 0 deletions components/bench/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ func registerTpch(root *cobra.Command) {
"tidb_index_serial_scan_concurrency",
1,
"tidb_index_serial_scan_concurrency param for analyze jobs")
cmdPrepare.PersistentFlags().StringVar(&tpchConfig.OutputType,
"output-type",
"",
"Output file type. If empty, then load data to db. Current only support csv")
cmdPrepare.PersistentFlags().StringVar(&tpchConfig.OutputDir,
"output-dir",
"",
"Output directory for generating file if specified")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
2 changes: 1 addition & 1 deletion components/cluster/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ is the cluster name. Some checks are ignore in this mode, such as port and dir
conflict checks with other clusters
If you want to check the scale-out topology, please use execute the following command
' check <cluster-name> <scale-out.yml> --cluster '
it will the new instances `,
it will check the new instances `,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 && len(args) != 2 {
return cmd.Help()
Expand Down
16 changes: 3 additions & 13 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,9 @@ func (s *Specification) GetMasterList() []string {
return masterList
}

// FillHostArch fills the topology with the given host->arch
func (s *Specification) FillHostArch(hostArch map[string]string) error {
return spec.FillHostArch(s, hostArch)
// FillHostArchOrOS fills the topology with the given host->arch
func (s *Specification) FillHostArchOrOS(hostArch map[string]string, fullType spec.FullHostType) error {
return spec.FillHostArchOrOS(s, hostArch, fullType)
}

// Merge returns a new Topology which sum old ones
Expand Down Expand Up @@ -796,12 +796,6 @@ func setDMCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) erro
field.Field(j).Set(reflect.ValueOf(globalOptions.LogDir))
}
case "Arch":
// default values of globalOptions are set before fillCustomDefaults in Unmarshal
// so the globalOptions.Arch already has its default value set, no need to check again
if field.Field(j).String() == "" {
field.Field(j).Set(reflect.ValueOf(globalOptions.Arch))
}

switch strings.ToLower(field.Field(j).String()) {
// replace "x86_64" with amd64, they are the same in our repo
case "x86_64":
Expand All @@ -816,10 +810,6 @@ func setDMCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) erro
field.Field(j).Set(reflect.ValueOf(strings.ToLower(field.Field(j).String())))
}
case "OS":
// default value of globalOptions.OS is already set, same as "Arch"
if field.Field(j).String() == "" {
field.Field(j).Set(reflect.ValueOf(globalOptions.OS))
}
// convert to lower case
if field.Field(j).String() != "" {
field.Field(j).Set(reflect.ValueOf(strings.ToLower(field.Field(j).String())))
Expand Down
6 changes: 6 additions & 0 deletions components/dm/spec/topology_dm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ master_servers:
arch: "arm64"
worker_servers:
- host: 172.16.5.138
arch: "aarch64"
`), &topo)
assert.Nil(t, err)

Expand All @@ -179,9 +180,11 @@ global:
master_servers:
- host: 172.16.5.138
arch: "aarch64"
os: "linux"
worker_servers:
- host: 172.16.5.138
arch: "amd64"
os: "linux"
`), &topo)
assert.NotNil(t, err)
assert.Equal(t, "platform mismatch for '172.16.5.138' between 'master_servers:linux/arm64' and 'worker_servers:linux/amd64'", err.Error())
Expand All @@ -195,8 +198,11 @@ global:
master_servers:
- host: 172.16.5.138
os: "darwin"
arch: "aarch64"
worker_servers:
- host: 172.16.5.138
os: "linux"
arch: "aarch64"
`), &topo)
assert.NotNil(t, err)
assert.Equal(t, "platform mismatch for '172.16.5.138' between 'master_servers:darwin/arm64' and 'worker_servers:linux/arm64'", err.Error())
Expand Down
9 changes: 9 additions & 0 deletions pkg/cluster/ansible/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

if port, ok := grpVars["tidb_port"]; ok {
Expand Down Expand Up @@ -223,6 +224,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

if port, ok := grpVars["tikv_port"]; ok {
Expand Down Expand Up @@ -269,6 +271,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}
if tmpIns.Host != srv.Name {
tmpIns.Name = srv.Name // use alias as the name of PD
Expand Down Expand Up @@ -318,6 +321,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

if tcpPort, ok := grpVars["tcp_port"]; ok {
Expand Down Expand Up @@ -396,6 +400,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

if port, ok := grpVars["prometheus_port"]; ok {
Expand Down Expand Up @@ -442,6 +447,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

if port, ok := grpVars["alertmanager_port"]; ok {
Expand Down Expand Up @@ -482,6 +488,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

if port, ok := grpVars["grafana_port"]; ok {
Expand Down Expand Up @@ -527,6 +534,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

// nothing in pump_servers.yml
Expand Down Expand Up @@ -569,6 +577,7 @@ func parseGroupVars(ctx context.Context, dir, ansCfgFile string, clsMeta *spec.C
SSHPort: getHostPort(srv, ansCfg),
Imported: true,
Arch: "amd64",
OS: "linux",
}

// nothing in drainer_servers.yml
Expand Down
2 changes: 0 additions & 2 deletions pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func buildScaleOutTask(
// Deploy monitor relevant components to remote
dlTasks, dpTasks, err := buildMonitoredDeployTask(
m,
name,
uninitializedHosts,
noAgentHosts,
topo.BaseTopo().GlobalOptions,
Expand Down Expand Up @@ -407,7 +406,6 @@ type hostInfo struct {

func buildMonitoredDeployTask(
m *Manager,
name string,
uniqueHosts map[string]hostInfo, // host -> ssh-port, os, arch
noAgentHosts set.StringSet, // hosts that do not deploy monitor agents
globalOptions *spec.GlobalOptions,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *Manager) CheckCluster(clusterOrTopoName, scaleoutTopo string, opt Check
}
}

if err := m.fillHostArch(sshConnProps, sshProxyProps, &topo, &gOpt, opt.User); err != nil {
if err := m.fillHost(sshConnProps, sshProxyProps, &topo, &gOpt, opt.User); err != nil {
return err
}

Expand Down
12 changes: 2 additions & 10 deletions pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,7 @@ func (m *Manager) Deploy(
}
}

clusterList, err := m.specManager.GetAllClusters()
if err != nil {
return err
}
if err := spec.CheckClusterPortConflict(clusterList, name, topo); err != nil {
return err
}
if err := spec.CheckClusterDirConflict(clusterList, name, topo); err != nil {
if err := checkConflict(m, name, topo); err != nil {
return err
}

Expand All @@ -157,7 +150,7 @@ func (m *Manager) Deploy(
}
}

if err := m.fillHostArch(sshConnProps, sshProxyProps, topo, &gOpt, opt.User); err != nil {
if err := m.fillHost(sshConnProps, sshProxyProps, topo, &gOpt, opt.User); err != nil {
return err
}

Expand Down Expand Up @@ -325,7 +318,6 @@ func (m *Manager) Deploy(
// Deploy monitor relevant components to remote
dlTasks, dpTasks, err := buildMonitoredDeployTask(
m,
name,
uniqueHosts,
noAgentHosts,
globalOptions,
Expand Down
92 changes: 58 additions & 34 deletions pkg/cluster/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,42 +166,66 @@ func (m *Manager) sshTaskBuilder(name string, topo spec.Topology, user string, g
), nil
}

func (m *Manager) fillHostArch(s, p *tui.SSHConnectionProps, topo spec.Topology, gOpt *operator.Options, user string) error {
// fillHost full host cpu-arch and kernel-name
func (m *Manager) fillHost(s, p *tui.SSHConnectionProps, topo spec.Topology, gOpt *operator.Options, user string) error {
if err := m.fillHostArchOrOS(s, p, topo, gOpt, user, spec.FullArchType); err != nil {
return err
}

return m.fillHostArchOrOS(s, p, topo, gOpt, user, spec.FullOSType)
}

// fillHostArchOrOS full host cpu-arch or kernel-name
func (m *Manager) fillHostArchOrOS(s, p *tui.SSHConnectionProps, topo spec.Topology, gOpt *operator.Options, user string, fullType spec.FullHostType) error {
globalSSHType := topo.BaseTopo().GlobalOptions.SSHType
hostArch := map[string]string{}
hostArchOrOS := map[string]string{}
var detectTasks []*task.StepDisplay

topo.IterInstance(func(inst spec.Instance) {
if inst.Arch() != "" {
if fullType == spec.FullOSType {
if inst.OS() != "" {
return
}
} else if inst.Arch() != "" {
return
}
if _, ok := hostArch[inst.GetHost()]; ok {

if _, ok := hostArchOrOS[inst.GetHost()]; ok {
return
}
hostArch[inst.GetHost()] = ""

tf := task.NewBuilder(m.logger).
RootSSH(
inst.GetHost(),
inst.GetSSHPort(),
user,
s.Password,
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHProxyHost,
gOpt.SSHProxyPort,
gOpt.SSHProxyUser,
p.Password,
p.IdentityFile,
p.IdentityFilePassphrase,
gOpt.SSHProxyTimeout,
gOpt.SSHType,
globalSSHType,
).
Shell(inst.GetHost(), "uname -m", "", false).
BuildAsStep(fmt.Sprintf(" - Detecting node %s", inst.GetHost()))
detectTasks = append(detectTasks, tf)
hostArchOrOS[inst.GetHost()] = ""

tf := task.NewSimpleUerSSH(m.logger, inst.GetHost(), inst.GetSSHPort(), user, *gOpt, p, globalSSHType)
if s.Password != "" || user == "root" {
tf = task.NewBuilder(m.logger).
RootSSH(
inst.GetHost(),
inst.GetSSHPort(),
user,
s.Password,
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHProxyHost,
gOpt.SSHProxyPort,
gOpt.SSHProxyUser,
p.Password,
p.IdentityFile,
p.IdentityFilePassphrase,
gOpt.SSHProxyTimeout,
gOpt.SSHType,
globalSSHType,
)
}

switch fullType {
case spec.FullOSType:
tf = tf.Shell(inst.GetHost(), "uname -s", "", false)
default:
tf = tf.Shell(inst.GetHost(), "uname -m", "", false)
}
detectTasks = append(detectTasks, tf.BuildAsStep(fmt.Sprintf(" - Detecting node %s %s info", inst.GetHost(), string(fullType))))
})
if len(detectTasks) == 0 {
return nil
Expand All @@ -213,19 +237,19 @@ func (m *Manager) fillHostArch(s, p *tui.SSHConnectionProps, topo spec.Topology,
m.logger,
)
t := task.NewBuilder(m.logger).
ParallelStep("+ Detect CPU Arch", false, detectTasks...).
ParallelStep(fmt.Sprintf("+ Detect CPU %s Name", string(fullType)), false, detectTasks...).
Build()

if err := t.Execute(ctx); err != nil {
return perrs.Annotate(err, "failed to fetch cpu arch")
return perrs.Annotate(err, "failed to fetch cpu-arch or kernel-name")
}

for host := range hostArch {
for host := range hostArchOrOS {
stdout, _, ok := ctxt.GetInner(ctx).GetOutputs(host)
if !ok {
return fmt.Errorf("no check results found for %s", host)
}
hostArch[host] = strings.Trim(string(stdout), "\n")
hostArchOrOS[host] = strings.Trim(string(stdout), "\n")
}
return topo.FillHostArch(hostArch)
return topo.FillHostArchOrOS(hostArchOrOS, fullType)
}
2 changes: 1 addition & 1 deletion pkg/cluster/manager/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (m *Manager) ScaleOut(
}
}

if err := m.fillHostArch(sshConnProps, sshProxyProps, newPart, &gOpt, opt.User); err != nil {
if err := m.fillHost(sshConnProps, sshProxyProps, newPart, &gOpt, opt.User); err != nil {
return err
}

Expand Down
Loading

0 comments on commit c96efa7

Please sign in to comment.