Skip to content

Commit

Permalink
Merge branch 'master' into issue-27177-staging-3
Browse files Browse the repository at this point in the history
  • Loading branch information
w169q169 authored Sep 23, 2021
2 parents 13cee58 + f2cf4cc commit d2a61dd
Show file tree
Hide file tree
Showing 50 changed files with 706 additions and 518 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/bug-closed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
if: |
contains(github.event.issue.labels.*.name, 'type/bug') &&
!(contains(join(github.event.issue.labels.*.name, ', '), 'affects-') &&
contains(join(github.event.issue.labels.*.name, ', '), 'backport-'))
contains(join(github.event.issue.labels.*.name, ', '), 'fixes-'))
runs-on: ubuntu-latest
permissions:
issues: write
Expand All @@ -25,5 +25,4 @@ jobs:
with:
issue-number: ${{ github.event.issue.number }}
body: |
Please check whether the issue should be labeled with 'affects-x.y' or 'backport-x.y.z',
and then remove 'needs-more-info' label.
Please check whether the issue should be labeled with 'affects-x.y' or 'fixes-x.y.z', and then remove 'needs-more-info' label.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ else
CGO_ENABLED=1 $(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o '$(TARGET)' tidb-server/main.go
endif

server_debug:
ifeq ($(TARGET), "")
CGO_ENABLED=1 $(GOBUILD) -gcflags="all=-N -l" $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o bin/tidb-server-debug tidb-server/main.go
else
CGO_ENABLED=1 $(GOBUILD) -gcflags="all=-N -l" $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o '$(TARGET)' tidb-server/main.go
endif

server_check:
ifeq ($(TARGET), "")
$(GOBUILD) $(RACE_FLAG) -ldflags '$(CHECK_LDFLAGS)' -o bin/tidb-server tidb-server/main.go
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MinRegionSize ByteSize = 256 * units.MiB
MaxRegionSize ByteSize = 256 * units.MiB
SplitRegionSize ByteSize = 96 * units.MiB
MaxSplitRegionSizeRatio int = 10
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Lightning struct {
server http.Server
serverAddr net.Addr
serverLock sync.Mutex
status restore.LightningStatus

cancelLock sync.Mutex
curTask *config.Config
Expand Down Expand Up @@ -310,7 +311,8 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
web.BroadcastInitProgress(dbMetas)

var procedure *restore.Controller
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, s, g)

procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g)
if err != nil {
log.L().Error("restore failed", log.ShortError(err))
return errors.Trace(err)
Expand All @@ -333,6 +335,13 @@ func (l *Lightning) Stop() {
l.shutdown()
}

// Status return the sum size of file which has been imported to TiKV and the total size of source file.
func (l *Lightning) Status() (finished int64, total int64) {
finished = l.status.FinishedFileSize.Load()
total = l.status.TotalFileSize.Load()
return
}

func writeJSONError(w http.ResponseWriter, code int, prefix string, err error) {
type errorResponse struct {
Error string `json:"error"`
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func InitLogger(cfg *Config, tidbLoglevel string) error {
return nil
}

// SetAppLogger replaces the default logger in this package to given one
func SetAppLogger(l *zap.Logger) {
appLogger = Logger{l.WithOptions(zap.AddStacktrace(zap.DPanicLevel))}
}

// L returns the current logger for Lightning.
func L() Logger {
return appLogger
Expand Down
11 changes: 4 additions & 7 deletions br/pkg/lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import (

const (
// states used for the TableCounter labels
TableStatePending = "pending"
TableStateWritten = "written"
TableStateClosed = "closed"
TableStateImported = "imported"
TableStateAlteredAutoInc = "altered_auto_inc"
TableStateChecksum = "checksum"
TableStateCompleted = "completed"
TableStatePending = "pending"
TableStateWritten = "written"
TableStateImported = "imported"
TableStateCompleted = "completed"

// results used for the TableCounter labels
TableResultSuccess = "success"
Expand Down
51 changes: 27 additions & 24 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (rc *Controller) getClusterAvail(ctx context.Context) (uint64, error) {
return clusterAvail, nil
}

// ClusterResource check cluster has enough resource to import data. this test can by skipped.
func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error {
// clusterResource check cluster has enough resource to import data. this test can by skipped.
func (rc *Controller) clusterResource(ctx context.Context, localSource int64) error {
passed := true
message := "Cluster resources are rich for this import task"
defer func() {
Expand Down Expand Up @@ -167,11 +167,6 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error {
defer func() {
rc.checkTemplate.Collect(Critical, passed, message)
}()
// skip requirement check if explicitly turned off
if !rc.cfg.App.CheckRequirements {
message = "Cluster's available check is skipped by user requirement"
return nil
}
checkCtx := &backend.CheckCtx{
DBMetas: rc.dbMetas,
}
Expand Down Expand Up @@ -314,8 +309,8 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
return nil
}

// CheckClusterRegion checks cluster if there are too many empty regions or region distribution is unbalanced.
func (rc *Controller) CheckClusterRegion(ctx context.Context) error {
// checkClusterRegion checks cluster if there are too many empty regions or region distribution is unbalanced.
func (rc *Controller) checkClusterRegion(ctx context.Context) error {
err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
restoreStarted := false
for _, task := range tasks {
Expand Down Expand Up @@ -390,7 +385,7 @@ func (rc *Controller) HasLargeCSV(dbMetas []*mydump.MDDatabaseMeta) error {
return nil
}

func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
func (rc *Controller) estimateSourceData(ctx context.Context) (int64, error) {
sourceSize := int64(0)
originSource := int64(0)
bigTableCount := 0
Expand All @@ -404,21 +399,32 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
for _, tbl := range db.Tables {
tableInfo, ok := info.Tables[tbl.Name]
if ok {
if err := rc.SampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil {
return sourceSize, errors.Trace(err)
}
sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
originSource += tbl.TotalSize
if tbl.TotalSize > int64(config.DefaultBatchSize)*2 {
bigTableCount += 1
if !tbl.IsRowOrdered {
unSortedTableCount += 1
// Do not sample small table because there may a large number of small table and it will take a long
// time to sample data for all of them.
if tbl.TotalSize < int64(config.SplitRegionSize) {
sourceSize += tbl.TotalSize
tbl.IndexRatio = 1.0
tbl.IsRowOrdered = false
} else {
if err := rc.sampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil {
return sourceSize, errors.Trace(err)
}
sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
if tbl.TotalSize > int64(config.DefaultBatchSize)*2 {
bigTableCount += 1
if !tbl.IsRowOrdered {
unSortedTableCount += 1
}
}
}
tableCount += 1
}
}
}
if rc.status != nil {
rc.status.TotalFileSize.Store(originSource)
}

// Do not import with too large concurrency because these data may be all unsorted.
if bigTableCount > 0 && unSortedTableCount > 0 {
Expand All @@ -429,8 +435,8 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
return sourceSize, nil
}

// LocalResource checks the local node has enough resources for this import when local backend enabled;
func (rc *Controller) LocalResource(ctx context.Context, sourceSize int64) error {
// localResource checks the local node has enough resources for this import when local backend enabled;
func (rc *Controller) localResource(sourceSize int64) error {
if rc.isSourceInLocal() {
sourceDir := strings.TrimPrefix(rc.cfg.Mydumper.SourceDir, storage.LocalURIPrefix)
same, err := common.SameDisk(sourceDir, rc.cfg.TikvImporter.SortedKVDir)
Expand All @@ -449,9 +455,6 @@ func (rc *Controller) LocalResource(ctx context.Context, sourceSize int64) error
return errors.Trace(err)
}
localAvailable := storageSize.Available
if err = rc.taskMgr.InitTask(ctx, sourceSize); err != nil {
return errors.Trace(err)
}

var message string
var passed bool
Expand Down Expand Up @@ -732,7 +735,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
return msgs, nil
}

func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
func (rc *Controller) sampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
if len(tableMeta.DataFiles) == 0 {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/check_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) {
}

func (c *SimpleTemplate) Success() bool {
return c.warnFailedCount+c.criticalFailedCount == 0
return c.criticalFailedCount == 0
}

func (c *SimpleTemplate) FailedCount(t CheckType) int {
Expand Down
64 changes: 40 additions & 24 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,22 +257,30 @@ type Controller struct {
diskQuotaLock *diskQuotaLock
diskQuotaState atomic.Int32
compactState atomic.Int32
status *LightningStatus
}

type LightningStatus struct {
FinishedFileSize atomic.Int64
TotalFileSize atomic.Int64
}

func NewRestoreController(
ctx context.Context,
dbMetas []*mydump.MDDatabaseMeta,
cfg *config.Config,
status *LightningStatus,
s storage.ExternalStorage,
g glue.Glue,
) (*Controller, error) {
return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, s, DeliverPauser, g)
return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, status, s, DeliverPauser, g)
}

func NewRestoreControllerWithPauser(
ctx context.Context,
dbMetas []*mydump.MDDatabaseMeta,
cfg *config.Config,
status *LightningStatus,
s storage.ExternalStorage,
pauser *common.Pauser,
g glue.Glue,
Expand Down Expand Up @@ -379,6 +387,7 @@ func NewRestoreControllerWithPauser(
store: s,
metaMgrBuilder: metaBuilder,
diskQuotaLock: newDiskQuotaLock(),
status: status,
taskMgr: nil,
}

Expand Down Expand Up @@ -1716,18 +1725,26 @@ func (rc *Controller) isLocalBackend() bool {
// 4. Lightning configuration
// before restore tables start.
func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err := rc.ClusterIsAvailable(ctx); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
if err := rc.ClusterIsAvailable(ctx); err != nil {
return errors.Trace(err)
}

if err := rc.StoragePermission(ctx); err != nil {
return errors.Trace(err)
if err := rc.StoragePermission(ctx); err != nil {
return errors.Trace(err)
}
}

if err := rc.metaMgrBuilder.Init(ctx); err != nil {
return err
}
taskExist := false

// We still need to sample source data even if this task has existed, because we need to judge whether the
// source is in order as row key to decide how to sort local data.
source, err := rc.estimateSourceData(ctx)
if err != nil {
return errors.Trace(err)
}
if rc.isLocalBackend() {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
Expand All @@ -1742,29 +1759,28 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return errors.Trace(err)
}
if !taskExist {
source, err := rc.EstimateSourceData(ctx)
if err != nil {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return errors.Trace(err)
}
err = rc.LocalResource(ctx, source)
if err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.ClusterResource(ctx, source); err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.CheckClusterRegion(ctx); err != nil {
return errors.Trace(err)
if rc.cfg.App.CheckRequirements {
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
}
if err := rc.clusterResource(ctx, source); err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.checkClusterRegion(ctx); err != nil {
return errors.Trace(err)
}
}
}
}
if rc.tidbGlue.OwnsSQLExecutor() {
// print check info at any time.

if rc.tidbGlue.OwnsSQLExecutor() && rc.cfg.App.CheckRequirements {
fmt.Print(rc.checkTemplate.Output())
if rc.cfg.App.CheckRequirements && !rc.checkTemplate.Success() {
// if check requirements is true, return error.
if !rc.checkTemplate.Success() {
if !taskExist && rc.taskMgr != nil {
rc.taskMgr.CleanupTask(ctx)
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
sourceSize += size
return nil
})
err = rc.ClusterResource(ctx, sourceSize)
err = rc.clusterResource(ctx, sourceSize)
c.Assert(err, IsNil)

c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCount)
Expand Down Expand Up @@ -1835,7 +1835,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
rc := &Controller{cfg: cfg, tls: tls, taskMgr: mockTaskMetaMgr{}, checkTemplate: template}

err := rc.CheckClusterRegion(context.Background())
err := rc.checkClusterRegion(context.Background())
c.Assert(err, IsNil)
c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCnt)
c.Assert(template.Success(), Equals, ca.expectResult)
Expand Down Expand Up @@ -1887,7 +1887,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV(c *C) {
{
false,
"(.*)large csv: /testPath file exists(.*)",
false,
true,
1,
[]*mydump.MDDatabaseMeta{
{
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,20 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
}
}
if err != nil {
setError(err)
}
}(restoreWorker, engineID, engine)
} else {
for _, chunk := range engine.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
}
}

Expand Down
Loading

0 comments on commit d2a61dd

Please sign in to comment.