Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: do not output pre-check info when disable check-requirements #27934

Merged
merged 10 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func (rc *Controller) getClusterAvail(ctx context.Context) (uint64, error) {
return clusterAvail, nil
}

// ClusterResource check cluster has enough resource to import data. this test can by skipped.
func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error {
func (rc *Controller) clusterResource(ctx context.Context, localSource int64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's useful to keep the doc comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

passed := true
message := "Cluster resources are rich for this import task"
defer func() {
Expand Down Expand Up @@ -314,8 +313,7 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
return nil
}

// CheckClusterRegion checks cluster if there are too many empty regions or region distribution is unbalanced.
func (rc *Controller) CheckClusterRegion(ctx context.Context) error {
func (rc *Controller) checkClusterRegion(ctx context.Context) error {
err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
restoreStarted := false
for _, task := range tasks {
Expand Down Expand Up @@ -390,7 +388,7 @@ func (rc *Controller) HasLargeCSV(dbMetas []*mydump.MDDatabaseMeta) error {
return nil
}

func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
func (rc *Controller) estimateSourceData(ctx context.Context) (int64, error) {
sourceSize := int64(0)
originSource := int64(0)
bigTableCount := 0
Expand All @@ -412,7 +410,7 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
tbl.IndexRatio = 1.0
tbl.IsRowOrdered = false
} else {
if err := rc.SampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil {
if err := rc.sampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil {
return sourceSize, errors.Trace(err)
}
sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
Expand All @@ -427,7 +425,9 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
}
}
}
rc.status.TotalFileSize.Store(originSource)
if rc.status != nil {
rc.status.TotalFileSize.Store(originSource)
}

// Do not import with too large concurrency because these data may be all unsorted.
if bigTableCount > 0 && unSortedTableCount > 0 {
Expand All @@ -438,8 +438,7 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
return sourceSize, nil
}

// LocalResource checks the local node has enough resources for this import when local backend enabled;
func (rc *Controller) LocalResource(sourceSize int64) error {
func (rc *Controller) localResource(sourceSize int64) error {
if rc.isSourceInLocal() {
sourceDir := strings.TrimPrefix(rc.cfg.Mydumper.SourceDir, storage.LocalURIPrefix)
same, err := common.SameDisk(sourceDir, rc.cfg.TikvImporter.SortedKVDir)
Expand Down Expand Up @@ -738,7 +737,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
return msgs, nil
}

func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
func (rc *Controller) sampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
if len(tableMeta.DataFiles) == 0 {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {

// We still need to sample source data even if this task has existed, because we need to judge whether the
// source is in order as row key to decide how to sort local data.
source, err := rc.EstimateSourceData(ctx)
source, err := rc.estimateSourceData(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1765,15 +1765,15 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
err = rc.LocalResource(source)
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
}
if err := rc.ClusterResource(ctx, source); err != nil {
if err := rc.clusterResource(ctx, source); err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.CheckClusterRegion(ctx); err != nil {
if err := rc.checkClusterRegion(ctx); err != nil {
return errors.Trace(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
sourceSize += size
return nil
})
err = rc.ClusterResource(ctx, sourceSize)
err = rc.clusterResource(ctx, sourceSize)
c.Assert(err, IsNil)

c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCount)
Expand Down Expand Up @@ -1835,7 +1835,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
rc := &Controller{cfg: cfg, tls: tls, taskMgr: mockTaskMetaMgr{}, checkTemplate: template}

err := rc.CheckClusterRegion(context.Background())
err := rc.checkClusterRegion(context.Background())
c.Assert(err, IsNil)
c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCnt)
c.Assert(template.Success(), Equals, ca.expectResult)
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,10 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.FileMeta.FileSize)
if rc.status != nil {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.FileMeta.FileSize)
Copy link
Contributor

@sleepymole sleepymole Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple chunks may share a same file when we split large csv files. The correct value should be chunk.Chunk.EndOffset - chunk.Key.Offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if it started since a checkpoint, we shall calculate the total size rather than how much it does in this job.

}
}
}
if err != nil {
Expand Down