Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb-lightning in…
Browse files Browse the repository at this point in the history
…to pd-schedule
  • Loading branch information
glorv committed Oct 12, 2020
2 parents 3d20c40 + aa029d8 commit 0e90edf
Show file tree
Hide file tree
Showing 26 changed files with 953 additions and 243 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

[![Build Status](https://internal.pingcap.net/idc-jenkins/job/test_lightning_master/badge/icon)](https://internal.pingcap.net/idc-jenkins/job/test_lightning_master/)
[![Coverage Status](https://coveralls.io/repos/github/pingcap/tidb-lightning/badge.svg?branch=master)](https://coveralls.io/github/pingcap/tidb-lightning?branch=master)
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fpingcap%2Ftidb-lightning.svg?type=shield)](https://app.fossa.com/projects/git%2Bgithub.com%2Fpingcap%2Ftidb-lightning?ref=badge_shield)

**TiDB-Lightning** is a tool for fast full import of large amounts of data into a TiDB cluster.
Currently, we support reading SQL dump exported via mydumper.
Expand All @@ -19,3 +20,6 @@ for details on submitting patches and the contribution workflow.
## License

TiDB-Lightning is under the Apache 2.0 license. See the [LICENSE](./LICENSE) file for details.


[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fpingcap%2Ftidb-lightning.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fpingcap%2Ftidb-lightning?ref=badge_large)
25 changes: 11 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/pingcap/tidb-lightning
go 1.13

require (
cloud.google.com/go/bigquery v1.4.0 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/carlmjohnson/flagext v0.0.11
Expand All @@ -12,35 +11,33 @@ require (
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.3
github.com/golang/mock v1.4.4
github.com/google/go-cmp v0.5.0 // indirect
github.com/joho/sqltocsv v0.0.0-20190824231449-5650f27fd5b6
github.com/juju/loggo v0.0.0-20180524022052-584905176618 // indirect
github.com/onsi/ginkgo v1.13.0 // indirect
github.com/pingcap/br v0.0.0-20200928064155-3bf2cdee5b85
github.com/pingcap/br v0.0.0-20201009140310-ed2b14378e3f
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20200910095337-6b893f12be43
github.com/pingcap/kvproto v0.0.0-20200927025644-73dc27044686
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/parser v0.0.0-20200909072241-6dac7bb703e2
github.com/pingcap/tidb v1.1.0-beta.0.20200910052409-5d52a34b2476
github.com/pingcap/parser v0.0.0-20200921041333-cd2542b7a8a2
github.com/pingcap/tidb v1.1.0-beta.0.20200921082409-501466fb690d
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/satori/go.uuid v1.2.0
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tikv/pd v1.1.0-beta.0.20200820084926-bcfa77a7a593
github.com/xitongsys/parquet-go v1.5.2
github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5
go.opencensus.io v0.22.3 // indirect
github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09
github.com/xitongsys/parquet-go v1.5.4-0.20201010004835-f51647f24120
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc
golang.org/x/net v0.0.0-20200904194848-62affa334b73
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/text v0.3.3
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
google.golang.org/grpc v1.26.0
google.golang.org/grpc v1.27.1
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
modernc.org/mathutil v1.0.0
)
129 changes: 110 additions & 19 deletions go.sum

Large diffs are not rendered by default.

31 changes: 26 additions & 5 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,13 +878,15 @@ func (local *local) WriteAndIngestPairs(

for _, meta := range metas {
var err error
for i := 0; i < maxRetryTimes; i++ {
errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("meta", meta))
var resp *sst.IngestResponse
resp, err = local.Ingest(ctx, meta, region)
if err != nil {
log.L().Warn("ingest failed", zap.Error(err), zap.Reflect("meta", meta),
zap.Reflect("region", region))
errCnt++
continue
}
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
Expand All @@ -899,7 +901,7 @@ func (local *local) WriteAndIngestPairs(
})
var needRetry bool
var newRegion *split.RegionInfo
needRetry, newRegion, err = isIngestRetryable(resp, region, meta)
needRetry, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
if err == nil {
// ingest next meta
break
Expand All @@ -921,7 +923,7 @@ func (local *local) WriteAndIngestPairs(
}
}
if err != nil {
log.L().Error("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err))
log.L().Warn("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err))
return remainRange, errors.Trace(err)
}
}
Expand Down Expand Up @@ -1114,7 +1116,12 @@ func (local *local) NewEncoder(tbl table.Table, options *SessionOptions) Encoder
return NewTableKVEncoder(tbl, options)
}

func isIngestRetryable(resp *sst.IngestResponse, region *split.RegionInfo, meta *sst.SSTMeta) (bool, *split.RegionInfo, error) {
func (local *local) isIngestRetryable(
ctx context.Context,
resp *sst.IngestResponse,
region *split.RegionInfo,
meta *sst.SSTMeta,
) (bool, *split.RegionInfo, error) {
if resp.GetError() == nil {
return false, nil, nil
}
Expand All @@ -1127,8 +1134,22 @@ func isIngestRetryable(resp *sst.IngestResponse, region *split.RegionInfo, meta
Leader: newLeader,
Region: region.Region,
}
return true, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage())
} else {
var err error
for i := 0; ; i++ {
newRegion, err = local.splitCli.GetRegion(ctx, region.Region.GetStartKey())
if err != nil {
return false, nil, errors.Trace(err)
}
if newRegion != nil {
break
}
log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region),
zap.Int("retry", i))
time.Sleep(time.Second)
}
}
return true, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage())
case errPb.EpochNotMatch != nil:
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
Expand Down
41 changes: 24 additions & 17 deletions lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,36 +363,43 @@ func (be *tidbBackend) FetchRemoteTableModels(schemaName string) (tables []*mode
}
err = s.Transact(context.Background(), "fetch table columns", func(c context.Context, tx *sql.Tx) error {
rows, e := tx.Query(`
SELECT table_name, group_concat(column_name SEPARATOR '\n')
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = ?
GROUP BY table_name;
ORDER BY table_name;
`, schemaName)
if e != nil {
return e
}
defer rows.Close()

var (
curTableName string
curColOffset int
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnNamesConcat string
if e := rows.Scan(&tableName, &columnNamesConcat); e != nil {
var tableName, columnName string
if e := rows.Scan(&tableName, &columnName); e != nil {
return e
}
columnNames := strings.Split(columnNamesConcat, "\n")
columns := make([]*model.ColumnInfo, 0, len(columnNames))
for i, columnName := range columnNames {
columns = append(columns, &model.ColumnInfo{
Name: model.NewCIStr(columnName),
Offset: i,
State: model.StatePublic,
})
if tableName != curTableName {
curTable = &model.TableInfo{
Name: model.NewCIStr(tableName),
State: model.StatePublic,
PKIsHandle: true,
}
tables = append(tables, curTable)
curTableName = tableName
curColOffset = 0
}
tables = append(tables, &model.TableInfo{
Name: model.NewCIStr(tableName),
Columns: columns,
State: model.StatePublic,
PKIsHandle: true,

curTable.Columns = append(curTable.Columns, &model.ColumnInfo{
Name: model.NewCIStr(columnName),
Offset: curColOffset,
State: model.StatePublic,
})
curColOffset++
}
return rows.Err()
})
Expand Down
35 changes: 23 additions & 12 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,16 +384,12 @@ func (cfg *Config) LoadFromTOML(data []byte) error {
func (cfg *Config) Adjust() error {
// Reject problematic CSV configurations.
csv := &cfg.Mydumper.CSV
if len(csv.Separator) != 1 {
return errors.New("invalid config: `mydumper.csv.separator` must be exactly one byte long")
if len(csv.Separator) == 0 {
return errors.New("invalid config: `mydumper.csv.separator` must not be empty")
}

if len(csv.Delimiter) > 1 {
return errors.New("invalid config: `mydumper.csv.delimiter` must be one byte long or empty")
}

if csv.Separator == csv.Delimiter {
return errors.New("invalid config: cannot use the same character for both CSV delimiter and separator")
if len(csv.Delimiter) > 0 && (strings.HasPrefix(csv.Separator, csv.Delimiter) || strings.HasPrefix(csv.Delimiter, csv.Separator)) {
return errors.New("invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other")
}

if csv.BackslashEscape {
Expand Down Expand Up @@ -569,10 +565,25 @@ func (cfg *Config) Adjust() error {
}
}

u, err := url.Parse(cfg.Mydumper.SourceDir)
if err != nil {
return errors.Trace(err)
var u *url.URL

// An absolute Windows path like "C:\Users\XYZ" would be interpreted as
// an URL with scheme "C" and opaque data "\Users\XYZ".
// Therefore, we only perform URL parsing if we are sure the path is not
// an absolute Windows path.
// Here we use the `filepath.VolumeName` which can identify the "C:" part
// out of the path. On Linux this method always return an empty string.
// On Windows, the drive letter can only be single letters from "A:" to "Z:",
// so this won't mistake "S3:" as a Windows path.
if len(filepath.VolumeName(cfg.Mydumper.SourceDir)) == 0 {
u, err = url.Parse(cfg.Mydumper.SourceDir)
if err != nil {
return errors.Trace(err)
}
} else {
u = &url.URL{}
}

// convert path and relative path to a valid file url
if u.Scheme == "" {
if !common.IsDirExists(cfg.Mydumper.SourceDir) {
Expand All @@ -582,7 +593,7 @@ func (cfg *Config) Adjust() error {
if err != nil {
return errors.Annotatef(err, "covert data-source-dir '%s' to absolute path failed", cfg.Mydumper.SourceDir)
}
cfg.Mydumper.SourceDir = fmt.Sprintf("file://%s", absPath)
cfg.Mydumper.SourceDir = "file://" + filepath.ToSlash(absPath)
u.Path = absPath
u.Scheme = "file"
}
Expand Down
24 changes: 17 additions & 7 deletions lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,23 @@ func (s *configTestSuite) TestInvalidCSV(c *C) {
[mydumper.csv]
separator = ''
`,
err: "invalid config: `mydumper.csv.separator` must be exactly one byte long",
err: "invalid config: `mydumper.csv.separator` must not be empty",
},
{
input: `
[mydumper.csv]
separator = 'hello'
delimiter = 'hel'
`,
err: "invalid config: `mydumper.csv.separator` must be exactly one byte long",
err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other",
},
{
input: `
[mydumper.csv]
separator = 'hel'
delimiter = 'hello'
`,
err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other",
},
{
input: `
Expand All @@ -297,7 +306,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) {
[mydumper.csv]
separator = ','
`,
err: "invalid config: `mydumper.csv.separator` must be exactly one byte long",
err: "",
},
{
input: `
Expand All @@ -311,7 +320,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) {
[mydumper.csv]
delimiter = 'hello'
`,
err: "invalid config: `mydumper.csv.delimiter` must be one byte long or empty",
err: "",
},
{
input: `
Expand All @@ -324,17 +333,18 @@ func (s *configTestSuite) TestInvalidCSV(c *C) {
{
input: `
[mydumper.csv]
delimiter = '“'
separator = '\s'
delimiter = '\d'
`,
err: "invalid config: `mydumper.csv.delimiter` must be one byte long or empty",
err: "",
},
{
input: `
[mydumper.csv]
separator = '|'
delimiter = '|'
`,
err: "invalid config: cannot use the same character for both CSV delimiter and separator",
err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other",
},
{
input: `
Expand Down
35 changes: 33 additions & 2 deletions lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Lightning struct {
shutdown context.CancelFunc
server http.Server
serverAddr net.Addr
serverLock sync.Mutex

cancelLock sync.Mutex
curTask *config.Config
Expand Down Expand Up @@ -88,10 +90,37 @@ func New(globalCfg *config.GlobalConfig) *Lightning {
}

func (l *Lightning) GoServe() error {
if len(l.globalCfg.App.StatusAddr) == 0 {
handleSigUsr1(func() {
l.serverLock.Lock()
statusAddr := l.globalCfg.App.StatusAddr
shouldStartServer := len(statusAddr) == 0
if shouldStartServer {
l.globalCfg.App.StatusAddr = ":"
}
l.serverLock.Unlock()

if shouldStartServer {
// open a random port and start the server if SIGUSR1 is received.
if err := l.goServe(":", os.Stderr); err != nil {
log.L().Warn("failed to start HTTP server", log.ShortError(err))
}
} else {
// just prints the server address if it is already started.
log.L().Info("already started HTTP server", zap.Stringer("address", l.serverAddr))
}
})

l.serverLock.Lock()
statusAddr := l.globalCfg.App.StatusAddr
l.serverLock.Unlock()

if len(statusAddr) == 0 {
return nil
}
return l.goServe(statusAddr, ioutil.Discard)
}

func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux := http.NewServeMux()
mux.Handle("/", http.RedirectHandler("/web/", http.StatusFound))
mux.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -122,11 +151,13 @@ func (l *Lightning) GoServe() error {
},
})))

listener, err := net.Listen("tcp", l.globalCfg.App.StatusAddr)
listener, err := net.Listen("tcp", statusAddr)
if err != nil {
return err
}
l.serverAddr = listener.Addr()
log.L().Info("starting HTTP server", zap.Stringer("address", l.serverAddr))
fmt.Fprintln(realAddrWriter, "started HTTP server on", l.serverAddr)
l.server.Handler = mux
listener = l.globalTLS.WrapListener(listener)

Expand Down
Loading

0 comments on commit 0e90edf

Please sign in to comment.