Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

playground: support ticdc for playground #777

Merged
merged 7 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func buildCommands(tp CommandType, opt *bootOptions) (cmds []Command) {

cmds = append(cmds, c)
}
for i := 0; i < opt.ticdc.Num; i++ {
c := Command{
CommandType: tp,
ComponentID: "ticdc",
Config: opt.ticdc,
}

cmds = append(cmds, c)
}
for i := 0; i < opt.drainer.Num; i++ {
c := Command{
CommandType: tp,
Expand Down Expand Up @@ -127,6 +136,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().IntVarP(&opt.tikv.Num, "kv", "", opt.tikv.Num, "TiKV instance number")
cmd.Flags().IntVarP(&opt.pd.Num, "pd", "", opt.pd.Num, "PD instance number")
cmd.Flags().IntVarP(&opt.tiflash.Num, "tiflash", "", opt.tiflash.Num, "TiFlash instance number")
cmd.Flags().IntVarP(&opt.ticdc.Num, "ticdc", "", opt.ticdc.Num, "TiCDC instance number")
cmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number")
cmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.pump.Num, "Drainer instance number")

Expand All @@ -144,6 +154,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.tikv.BinPath, "kv.binpath", "", opt.tikv.BinPath, "TiKV instance binary path")
cmd.Flags().StringVarP(&opt.pd.BinPath, "pd.binpath", "", opt.pd.BinPath, "PD instance binary path")
cmd.Flags().StringVarP(&opt.tiflash.BinPath, "tiflash.binpath", "", opt.tiflash.BinPath, "TiFlash instance binary path")
cmd.Flags().StringVarP(&opt.ticdc.BinPath, "ticdc.binpath", "", opt.ticdc.BinPath, "TiCDC instance binary path")
cmd.Flags().StringVarP(&opt.pump.BinPath, "pump.binpath", "", opt.pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.drainer.BinPath, "drainer.binpath", "", opt.drainer.BinPath, "Drainer instance binary path")

Expand Down
120 changes: 120 additions & 0 deletions components/playground/instance/ticdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/pingcap/tiup/pkg/repository/v0manifest"
"github.com/pingcap/tiup/pkg/utils"
)

// TiCDC represent a ticdc instance.
type TiCDC struct {
instance
pds []*PDInstance
Process
}

var _ Instance = &TiCDC{}

// NewTiCDC create a TiCDC instance.
func NewTiCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiCDC {
ticdc := &TiCDC{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8300),
ConfigPath: configPath,
},
pds: pds,
}
ticdc.StatusPort = ticdc.Port
july2993 marked this conversation as resolved.
Show resolved Hide resolved
return ticdc
}

// NodeID return the node id of pump.
func (c *TiCDC) NodeID() string {
return fmt.Sprintf("ticdc_%d", c.ID)
}

// Ready return nil when pump is ready to serve.
zier-one marked this conversation as resolved.
Show resolved Hide resolved
func (c *TiCDC) Ready(ctx context.Context) error {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
url := fmt.Sprintf("http://%s:%d/status", c.Host, c.Port)

for {
resp, err := http.Get(url)
if err == nil && resp.StatusCode == 200 {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
// just retry
}
}
}

// Addr return the address of Pump.
func (c *TiCDC) Addr() string {
return fmt.Sprintf("%s:%d", advertiseHost(c.Host), c.Port)
}

// Start implements Instance interface.
func (c *TiCDC) Start(ctx context.Context, version v0manifest.Version) error {
if err := os.MkdirAll(c.Dir, 0755); err != nil {
return err
}

var urls []string
for _, pd := range c.pds {
urls = append(urls, fmt.Sprintf("http://%s:%d", pd.Host, pd.StatusPort))
}

args := []string{
"server",
fmt.Sprintf("--addr=%s:%d", c.Host, c.Port),
fmt.Sprintf("--advertise-addr=%s:%d", advertiseHost(c.Host), c.Port),
fmt.Sprintf("--pd=%s", strings.Join(urls, ",")),
fmt.Sprintf("--log-file=%s", c.LogFile()),
}

var err error
if c.Process, err = NewComponentProcess(ctx, c.Dir, c.BinPath, "cdc", version, args...); err != nil {
return err
}
logIfErr(c.Process.SetOutputFile(c.LogFile()))

return c.Process.Start()
}

// Component return component name.
func (c *TiCDC) Component() string {
return "ticdc"
}

// LogFile return the log file.
func (c *TiCDC) LogFile() string {
return filepath.Join(c.Dir, "ticdc.log")
}
3 changes: 3 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type bootOptions struct {
tidb instance.Config
tikv instance.Config
tiflash instance.Config
ticdc instance.Config
pump instance.Config
drainer instance.Config
host string
Expand Down Expand Up @@ -198,6 +199,7 @@ Examples:
rootCmd.Flags().IntVarP(&opt.tikv.Num, "kv", "", opt.tikv.Num, "TiKV instance number")
rootCmd.Flags().IntVarP(&opt.pd.Num, "pd", "", opt.pd.Num, "PD instance number")
rootCmd.Flags().IntVarP(&opt.tiflash.Num, "tiflash", "", opt.tiflash.Num, "TiFlash instance number")
rootCmd.Flags().IntVarP(&opt.ticdc.Num, "ticdc", "", opt.ticdc.Num, "TiCDC instance number")
rootCmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number")
rootCmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.drainer.Num, "Drainer instance number")

Expand All @@ -217,6 +219,7 @@ Examples:
rootCmd.Flags().StringVarP(&opt.tikv.BinPath, "kv.binpath", "", opt.tikv.BinPath, "TiKV instance binary path")
rootCmd.Flags().StringVarP(&opt.pd.BinPath, "pd.binpath", "", opt.pd.BinPath, "PD instance binary path")
rootCmd.Flags().StringVarP(&opt.tiflash.BinPath, "tiflash.binpath", "", opt.tiflash.BinPath, "TiFlash instance binary path")
rootCmd.Flags().StringVarP(&opt.ticdc.BinPath, "ticdc.binpath", "", opt.ticdc.BinPath, "TiCDC instance binary path")
rootCmd.Flags().StringVarP(&opt.pump.BinPath, "pump.binpath", "", opt.pump.BinPath, "Pump instance binary path")
rootCmd.Flags().StringVarP(&opt.drainer.BinPath, "drainer.binpath", "", opt.drainer.BinPath, "Drainer instance binary path")

Expand Down
26 changes: 26 additions & 0 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Playground struct {
tikvs []*instance.TiKVInstance
tidbs []*instance.TiDBInstance
tiflashs []*instance.TiFlashInstance
ticdcs []*instance.TiCDC
pumps []*instance.Pump
drainers []*instance.Drainer
startedInstances []instance.Instance
Expand Down Expand Up @@ -295,6 +296,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
p.tidbs = append(p.tidbs[:i], p.tidbs[i+1:]...)
}
}
case "ticdc":
for i := 0; i < len(p.ticdcs); i++ {
if p.ticdcs[i].Pid() == pid {
p.ticdcs = append(p.ticdcs[:i], p.ticdcs[i+1:]...)
}
}
case "tiflash":
for i := 0; i < len(p.tiflashs); i++ {
if p.tiflashs[i].Pid() == pid {
Expand Down Expand Up @@ -393,6 +400,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
return p.sanitizeConfig(p.bootOptions.tidb, cfg)
case "tiflash":
return p.sanitizeConfig(p.bootOptions.tiflash, cfg)
case "ticdc":
return p.sanitizeConfig(p.bootOptions.ticdc, cfg)
case "pump":
return p.sanitizeConfig(p.bootOptions.pump, cfg)
case "drainer":
Expand Down Expand Up @@ -548,6 +557,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
}
}

for _, ins := range p.ticdcs {
err := fn("ticdc", ins)
if err != nil {
return errors.AddStack(err)
}
}

for _, ins := range p.drainers {
err := fn("drainer", ins)
if err != nil {
Expand Down Expand Up @@ -618,6 +634,10 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i
inst := instance.NewTiFlashInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds, p.tidbs)
ins = inst
p.tiflashs = append(p.tiflashs, inst)
case "ticdc":
inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.ticdcs = append(p.ticdcs, inst)
case "pump":
inst := instance.NewPump(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
Expand Down Expand Up @@ -699,6 +719,12 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
return errors.AddStack(err)
}
}
for i := 0; i < options.ticdc.Num; i++ {
_, err := p.addInstance("ticdc", options.ticdc)
if err != nil {
return errors.AddStack(err)
}
}
for i := 0; i < options.drainer.Num; i++ {
_, err := p.addInstance("drainer", options.drainer)
if err != nil {
Expand Down