Skip to content

Commit

Permalink
config,lightning: Implements server mode (pingcap#198)
Browse files Browse the repository at this point in the history
* test: speed up TestGetJSON

Force a shorter timeout on the HTTP client, so that accessing
`http://not-exists` won't take 30 seconds.

* config,lightning: implement "server mode"

In "Server Mode" Lightning will wait for tasks submitted via the HTTP API
`POST /tasks`, and will keep running until Ctrl+C. Multiple tasks are
executed sequentially.

The config is split into "Global config" and "Task config", which shares
the same structure for compatibility and simplicity.

The pprof-port setting has been deprecated in favor of status-addr, for
compatibility with other tools.

* lightning,config: cover some of the new code

* lightning: added `GET /tasks` API to get number of queued tasks

* *: addressed comments

* config,lightning: use a linked hash map to store queued configs

Changed /task to return JSON.

This is to prepare for an API removing a queued task, and also to remove
the artificial task queue size limit.

* config: change TaskID to record the current timestamp
  • Loading branch information
kennytm authored Jun 18, 2019
1 parent 3256747 commit 9fec239
Show file tree
Hide file tree
Showing 11 changed files with 782 additions and 258 deletions.
72 changes: 22 additions & 50 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,62 +38,34 @@ func main() {
}

func run() error {
cfg := config.NewConfig()
fs := flag.NewFlagSet("lightning-ctl", flag.ExitOnError)

fs.StringVar(&cfg.ConfigFile, "config", "", "tidb-lightning configuration file")
var (
compact *bool
mode, flagImportEngine, flagCleanupEngine *string
cpRemove, cpErrIgnore, cpErrDestroy, cpDump *string

logLevel := fs.String("L", "", `log level: info, debug, warn, error, fatal (default "info")`)
logFilePath := fs.String("log-file", "", "log file path")
tidbHost := fs.String("tidb-host", "", "TiDB server host")
tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)")
tidbUser := fs.String("tidb-user", "", "TiDB user name to connect")
pdAddr := fs.String("pd-urls", "", "PD endpoint address")
importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer")
fsUsage func()
)

compact := fs.Bool("compact", false, "do manual compaction on the target cluster")
mode := fs.String("switch-mode", "", "switch tikv into import mode or normal mode, values can be ['import', 'normal']")
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], func(fs *flag.FlagSet) {
compact = fs.Bool("compact", false, "do manual compaction on the target cluster")
mode = fs.String("switch-mode", "", "switch tikv into import mode or normal mode, values can be ['import', 'normal']")

flagImportEngine := fs.String("import-engine", "", "manually import a closed engine (value can be '`db`.`table`:123' or a UUID")
flagCleanupEngine := fs.String("cleanup-engine", "", "manually delete a closed engine")
flagImportEngine = fs.String("import-engine", "", "manually import a closed engine (value can be '`db`.`table`:123' or a UUID")
flagCleanupEngine = fs.String("cleanup-engine", "", "manually delete a closed engine")

cpRemove := fs.String("checkpoint-remove", "", "remove the checkpoint associated with the given table (value can be 'all' or '`db`.`table`')")
cpErrIgnore := fs.String("checkpoint-error-ignore", "", "ignore errors encoutered previously on the given table (value can be 'all' or '`db`.`table`'); may corrupt this table if used incorrectly")
cpErrDestroy := fs.String("checkpoint-error-destroy", "", "deletes imported data with table which has an error before (value can be 'all' or '`db`.`table`')")
cpDump := fs.String("checkpoint-dump", "", "dump the checkpoint information as two CSV files in the given folder")
cpRemove = fs.String("checkpoint-remove", "", "remove the checkpoint associated with the given table (value can be 'all' or '`db`.`table`')")
cpErrIgnore = fs.String("checkpoint-error-ignore", "", "ignore errors encoutered previously on the given table (value can be 'all' or '`db`.`table`'); may corrupt this table if used incorrectly")
cpErrDestroy = fs.String("checkpoint-error-destroy", "", "deletes imported data with table which has an error before (value can be 'all' or '`db`.`table`')")
cpDump = fs.String("checkpoint-dump", "", "dump the checkpoint information as two CSV files in the given folder")

err := fs.Parse(os.Args[1:])
if err == nil {
err = cfg.Load()
}
if err != nil {
return errors.Trace(err)
}
fsUsage = fs.Usage
}))

if *logLevel != "" {
cfg.App.Config.Level = *logLevel
}
if *logFilePath != "" {
cfg.App.Config.File = *logFilePath
}
if *tidbHost != "" {
cfg.TiDB.Host = *tidbHost
}
if *tidbPort != 0 {
cfg.TiDB.Port = *tidbPort
}
if *tidbUser != "" {
cfg.TiDB.User = *tidbUser
}
if *pdAddr != "" {
cfg.TiDB.PdAddr = *pdAddr
}
if *importerAddr != "" {
cfg.TikvImporter.Addr = *importerAddr
cfg := config.NewConfig()
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}

err = cfg.Adjust()
if err != nil {
if err := cfg.Adjust(); err != nil {
return err
}

Expand Down Expand Up @@ -125,7 +97,7 @@ func run() error {
return errors.Trace(checkpointDump(ctx, cfg, *cpDump))
}

fs.Usage()
fsUsage()
return nil
}

Expand Down
31 changes: 9 additions & 22 deletions cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,20 @@
package main

import (
"flag"
"fmt"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-lightning/lightning"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
plan "github.com/pingcap/tidb/planner/core"
"go.uber.org/zap"
)

func setGlobalVars() {
// hardcode it
plan.SetPreparedPlanCache(true)
plan.PreparedPlanCacheCapacity = 10
}

func main() {
setGlobalVars()

cfg, err := config.LoadConfig(os.Args[1:])
switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
fmt.Println("Failed to parse command flags: ", err)
os.Exit(1)
}

cfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
app := lightning.New(cfg)

sc := make(chan os.Signal, 1)
Expand All @@ -63,7 +43,14 @@ func main() {
app.Stop()
}()

err = app.Run()
go app.Serve()

var err error
if cfg.App.ServerMode {
err = app.RunServer()
} else {
err = app.RunOnce()
}
logger := log.L()
if err != nil {
logger.Error("tidb lightning encountered error", zap.Error(err))
Expand Down
10 changes: 7 additions & 3 deletions lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -65,18 +66,21 @@ func (s *utilSuite) TestGetJSON(c *C) {
handle(res, req)
}))
defer testServer.Close()

client := &http.Client{Timeout: time.Second}

response := TestPayload{}
err := common.GetJSON(http.DefaultClient, "http://not-exists", &response)
err := common.GetJSON(client, "http://not-exists", &response)
c.Assert(err, NotNil)
err = common.GetJSON(http.DefaultClient, testServer.URL, &response)
err = common.GetJSON(client, testServer.URL, &response)
c.Assert(err, IsNil)
c.Assert(request, DeepEquals, response)

// Mock `StatusNoContent` response
handle = func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusNoContent)
}
err = common.GetJSON(http.DefaultClient, testServer.URL, &response)
err = common.GetJSON(client, testServer.URL, &response)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*http status code != 200.*")
}
Expand Down
117 changes: 22 additions & 95 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ package config

import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"runtime"
"strings"
Expand Down Expand Up @@ -50,7 +48,6 @@ type DBStore struct {
StatusPort int `toml:"status-port" json:"status-port"`
PdAddr string `toml:"pd-addr" json:"pd-addr"`
StrSQLMode string `toml:"sql-mode" json:"sql-mode"`
LogLevel string `toml:"log-level" json:"log-level"`

SQLMode mysql.SQLMode `toml:"-" json:"-"`

Expand All @@ -61,21 +58,18 @@ type DBStore struct {
}

type Config struct {
TaskID int64 `toml:"-" json:"id"`

App Lightning `toml:"lightning" json:"lightning"`
TiDB DBStore `toml:"tidb" json:"tidb"`

// not implemented yet.
// ProgressStore DBStore `toml:"progress-store" json:"progress-store"`
Checkpoint Checkpoint `toml:"checkpoint" json:"checkpoint"`
Mydumper MydumperRuntime `toml:"mydumper" json:"mydumper"`
BWList *filter.Rules `toml:"black-white-list" json:"black-white-list"`
TikvImporter TikvImporter `toml:"tikv-importer" json:"tikv-importer"`
PostRestore PostRestore `toml:"post-restore" json:"post-restore"`
Cron Cron `toml:"cron" json:"cron"`
Routes []*router.TableRule `toml:"routes" json:"routes"`

// command line flags
ConfigFile string `json:"config-file"`
}

func (c *Config) String() string {
Expand All @@ -87,12 +81,10 @@ func (c *Config) String() string {
}

type Lightning struct {
log.Config
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}

Expand Down Expand Up @@ -171,7 +163,6 @@ func NewConfig() *Config {
Host: "127.0.0.1",
User: "root",
StatusPort: 10080,
LogLevel: "error",
StrSQLMode: mysql.DefaultSQLMode,
BuildStatsConcurrency: 20,
DistSQLScanConcurrency: 100,
Expand All @@ -186,6 +177,7 @@ func NewConfig() *Config {
ReadBlockSize: ReadBlockSize,
CSV: CSVConfig{
Separator: ",",
Delimiter: `"`,
},
},
PostRestore: PostRestore{
Expand All @@ -194,89 +186,30 @@ func NewConfig() *Config {
}
}

func LoadConfig(args []string) (*Config, error) {
cfg := NewConfig()

fs := flag.NewFlagSet("lightning", flag.ContinueOnError)

// if both `-c` and `-config` are specified, the last one in the command line will take effect.
// the default value is assigned immediately after the StringVar() call,
// so it is fine to not give any default value for `-c`, to keep the `-h` page clean.
fs.StringVar(&cfg.ConfigFile, "c", "", "(deprecated alias of -config)")
fs.StringVar(&cfg.ConfigFile, "config", "", "tidb-lightning configuration file")
printVersion := fs.Bool("V", false, "print version of lightning")

logLevel := fs.String("L", "", `log level: info, debug, warn, error, fatal (default "info")`)
logFilePath := fs.String("log-file", "", "log file path")
tidbHost := fs.String("tidb-host", "", "TiDB server host")
tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)")
tidbUser := fs.String("tidb-user", "", "TiDB user name to connect")
tidbStatusPort := fs.Int("tidb-status", 0, "TiDB server status port (default 10080)")
pdAddr := fs.String("pd-urls", "", "PD endpoint address")
dataSrcPath := fs.String("d", "", "Directory of the dump to import")
importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer")

if err := fs.Parse(args); err != nil {
return nil, errors.Trace(err)
}
if *printVersion {
fmt.Println(common.GetRawInfo())
return nil, flag.ErrHelp
}

if err := cfg.Load(); err != nil {
return nil, errors.Trace(err)
}

if *logLevel != "" {
cfg.App.Config.Level = *logLevel
}
if *logFilePath != "" {
cfg.App.Config.File = *logFilePath
}
if *tidbHost != "" {
cfg.TiDB.Host = *tidbHost
}
if *tidbPort != 0 {
cfg.TiDB.Port = *tidbPort
}
if *tidbStatusPort != 0 {
cfg.TiDB.StatusPort = *tidbStatusPort
}
if *tidbUser != "" {
cfg.TiDB.User = *tidbUser
}
if *pdAddr != "" {
cfg.TiDB.PdAddr = *pdAddr
}
if *dataSrcPath != "" {
cfg.Mydumper.SourceDir = *dataSrcPath
}
if *importerAddr != "" {
cfg.TikvImporter.Addr = *importerAddr
// LoadFromGlobal resets the current configuration to the global settings.
func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error {
if err := cfg.LoadFromTOML(global.ConfigFileContent); err != nil {
return err
}

if err := cfg.Adjust(); err != nil {
return nil, err
}
cfg.TiDB.Host = global.TiDB.Host
cfg.TiDB.Port = global.TiDB.Port
cfg.TiDB.User = global.TiDB.User
cfg.TiDB.StatusPort = global.TiDB.StatusPort
cfg.TiDB.PdAddr = global.TiDB.PdAddr
cfg.Mydumper.SourceDir = global.Mydumper.SourceDir
cfg.TikvImporter.Addr = global.TikvImporter.Addr

return cfg, nil
return nil
}

func (cfg *Config) Load() error {
// use standard config if unspecified.
if cfg.ConfigFile == "" {
return nil
}

data, err := ioutil.ReadFile(cfg.ConfigFile)
if err != nil {
return errors.Trace(err)
}
if err = toml.Unmarshal(data, cfg); err != nil {
return errors.Trace(err)
}
// LoadFromTOML overwrites the current configuration by the TOML data
func (cfg *Config) LoadFromTOML(data []byte) error {
return errors.Trace(toml.Unmarshal(data, cfg))
}

// Adjust fixes the invalid or unspecified settings to reasonable valid values.
func (cfg *Config) Adjust() error {
// Reject problematic CSV configurations.
csv := &cfg.Mydumper.CSV
if len(csv.Separator) != 1 {
Expand All @@ -300,6 +233,7 @@ func (cfg *Config) Load() error {
}
}

var err error
cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode)
if err != nil {
return errors.Annotate(err, "invalid config: `mydumper.tidb.sql_mode` must be a valid SQL_MODE")
Expand All @@ -314,13 +248,6 @@ func (cfg *Config) Load() error {
}
}

return nil
}

// Adjust fixes the invalid or unspecified settings to reasonable valid values.
func (cfg *Config) Adjust() error {
cfg.App.Config.Adjust()

// automatically determine the TiDB port & PD address from TiDB settings
if cfg.TiDB.Port <= 0 || len(cfg.TiDB.PdAddr) == 0 {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/settings", cfg.TiDB.Host, cfg.TiDB.StatusPort))
Expand Down
Loading

0 comments on commit 9fec239

Please sign in to comment.