Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: do not output pre-check info when disable check-requirements #27934

Merged
merged 10 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MinRegionSize ByteSize = 256 * units.MiB
MaxRegionSize ByteSize = 256 * units.MiB
SplitRegionSize ByteSize = 96 * units.MiB
MaxSplitRegionSizeRatio int = 10
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Lightning struct {
server http.Server
serverAddr net.Addr
serverLock sync.Mutex
status restore.LightningStatus

cancelLock sync.Mutex
curTask *config.Config
Expand Down Expand Up @@ -301,7 +302,8 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
web.BroadcastInitProgress(dbMetas)

var procedure *restore.Controller
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, s, g)

procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g)
if err != nil {
log.L().Error("restore failed", log.ShortError(err))
return errors.Trace(err)
Expand All @@ -324,6 +326,13 @@ func (l *Lightning) Stop() {
l.shutdown()
}

// Status return the sum size of file which has been imported to TiKV and the total size of source file.
func (l *Lightning) Status() (int64, int64) {
finished := l.status.FinishedFileSize.Load()
total := l.status.TotalFileSize.Load()
return finished, total
}
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved

func writeJSONError(w http.ResponseWriter, code int, prefix string, err error) {
type errorResponse struct {
Error string `json:"error"`
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func InitLogger(cfg *Config, tidbLoglevel string) error {
return nil
}

// SetAppLogger replaces the default logger in this package to given one
func SetAppLogger(l *zap.Logger) {
appLogger = Logger{l.WithOptions(zap.AddStacktrace(zap.DPanicLevel))}
}

// L returns the current logger for Lightning.
func L() Logger {
return appLogger
Expand Down
11 changes: 4 additions & 7 deletions br/pkg/lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import (

const (
// states used for the TableCounter labels
TableStatePending = "pending"
TableStateWritten = "written"
TableStateClosed = "closed"
TableStateImported = "imported"
TableStateAlteredAutoInc = "altered_auto_inc"
TableStateChecksum = "checksum"
TableStateCompleted = "completed"
TableStatePending = "pending"
TableStateWritten = "written"
TableStateImported = "imported"
TableStateCompleted = "completed"

// results used for the TableCounter labels
TableResultSuccess = "success"
Expand Down
43 changes: 24 additions & 19 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func (rc *Controller) getClusterAvail(ctx context.Context) (uint64, error) {
return clusterAvail, nil
}

// ClusterResource check cluster has enough resource to import data. this test can by skipped.
func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error {
func (rc *Controller) clusterResource(ctx context.Context, localSource int64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's useful to keep the doc comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

passed := true
message := "Cluster resources are rich for this import task"
defer func() {
Expand Down Expand Up @@ -314,8 +313,7 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
return nil
}

// CheckClusterRegion checks cluster if there are too many empty regions or region distribution is unbalanced.
func (rc *Controller) CheckClusterRegion(ctx context.Context) error {
func (rc *Controller) checkClusterRegion(ctx context.Context) error {
err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
restoreStarted := false
for _, task := range tasks {
Expand Down Expand Up @@ -390,7 +388,7 @@ func (rc *Controller) HasLargeCSV(dbMetas []*mydump.MDDatabaseMeta) error {
return nil
}

func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
func (rc *Controller) estimateSourceData(ctx context.Context) (int64, error) {
sourceSize := int64(0)
originSource := int64(0)
bigTableCount := 0
Expand All @@ -404,21 +402,32 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
for _, tbl := range db.Tables {
tableInfo, ok := info.Tables[tbl.Name]
if ok {
if err := rc.SampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil {
return sourceSize, errors.Trace(err)
}
sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
originSource += tbl.TotalSize
if tbl.TotalSize > int64(config.DefaultBatchSize)*2 {
bigTableCount += 1
if !tbl.IsRowOrdered {
unSortedTableCount += 1
// Do not sample small table because there may a large number of small table and it will take a long
// time to sample data for all of them.
if tbl.TotalSize < int64(config.SplitRegionSize) {
sourceSize += tbl.TotalSize
tbl.IndexRatio = 1.0
tbl.IsRowOrdered = false
} else {
if err := rc.sampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil {
return sourceSize, errors.Trace(err)
}
sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio)
if tbl.TotalSize > int64(config.DefaultBatchSize)*2 {
bigTableCount += 1
if !tbl.IsRowOrdered {
unSortedTableCount += 1
}
}
}
tableCount += 1
}
}
}
if rc.status != nil {
rc.status.TotalFileSize.Store(originSource)
}

// Do not import with too large concurrency because these data may be all unsorted.
if bigTableCount > 0 && unSortedTableCount > 0 {
Expand All @@ -429,8 +438,7 @@ func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) {
return sourceSize, nil
}

// LocalResource checks the local node has enough resources for this import when local backend enabled;
func (rc *Controller) LocalResource(ctx context.Context, sourceSize int64) error {
func (rc *Controller) localResource(sourceSize int64) error {
if rc.isSourceInLocal() {
sourceDir := strings.TrimPrefix(rc.cfg.Mydumper.SourceDir, storage.LocalURIPrefix)
same, err := common.SameDisk(sourceDir, rc.cfg.TikvImporter.SortedKVDir)
Expand All @@ -449,9 +457,6 @@ func (rc *Controller) LocalResource(ctx context.Context, sourceSize int64) error
return errors.Trace(err)
}
localAvailable := storageSize.Available
if err = rc.taskMgr.InitTask(ctx, sourceSize); err != nil {
return errors.Trace(err)
}

var message string
var passed bool
Expand Down Expand Up @@ -732,7 +737,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
return msgs, nil
}

func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
func (rc *Controller) sampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
if len(tableMeta.DataFiles) == 0 {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/check_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) {
}

func (c *SimpleTemplate) Success() bool {
return c.warnFailedCount+c.criticalFailedCount == 0
return c.criticalFailedCount == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// If the performance is not as expect or one of critical check not passed. it will stop import task.
Collect(t CheckType, passed bool, msg string)

If c.warnFailedCount > 0, the import task should also be failed. @3pointer PTAL

}

func (c *SimpleTemplate) FailedCount(t CheckType) int {
Expand Down
62 changes: 41 additions & 21 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,22 +257,30 @@ type Controller struct {
diskQuotaLock *diskQuotaLock
diskQuotaState atomic.Int32
compactState atomic.Int32
status *LightningStatus
}

type LightningStatus struct {
FinishedFileSize atomic.Int64
TotalFileSize atomic.Int64
}

func NewRestoreController(
ctx context.Context,
dbMetas []*mydump.MDDatabaseMeta,
cfg *config.Config,
status *LightningStatus,
s storage.ExternalStorage,
g glue.Glue,
) (*Controller, error) {
return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, s, DeliverPauser, g)
return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, status, s, DeliverPauser, g)
}

func NewRestoreControllerWithPauser(
ctx context.Context,
dbMetas []*mydump.MDDatabaseMeta,
cfg *config.Config,
status *LightningStatus,
s storage.ExternalStorage,
pauser *common.Pauser,
g glue.Glue,
Expand Down Expand Up @@ -379,6 +387,7 @@ func NewRestoreControllerWithPauser(
store: s,
metaMgrBuilder: metaBuilder,
diskQuotaLock: newDiskQuotaLock(),
status: status,
taskMgr: nil,
}

Expand Down Expand Up @@ -1716,13 +1725,16 @@ func (rc *Controller) isLocalBackend() bool {
// 4. Lightning configuration
// before restore tables start.
func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err := rc.ClusterIsAvailable(ctx); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
if err := rc.ClusterIsAvailable(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !rc.cfg.App.CheckRequirements {

this line can be removed

return errors.Trace(err)
}

if err := rc.StoragePermission(ctx); err != nil {
return errors.Trace(err)
if err := rc.StoragePermission(ctx); err != nil {
return errors.Trace(err)
}
}

if err := rc.metaMgrBuilder.Init(ctx); err != nil {
return err
}
Expand All @@ -1741,29 +1753,37 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}

// We still need to sample source data even if this task has existed, because we need to judge whether the
// source is in order as row key to decide how to sort local data.
source, err := rc.estimateSourceData(ctx)
if err != nil {
return errors.Trace(err)
}
if !taskExist {
source, err := rc.EstimateSourceData(ctx)
if err != nil {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return errors.Trace(err)
}
err = rc.LocalResource(ctx, source)
if err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.ClusterResource(ctx, source); err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.CheckClusterRegion(ctx); err != nil {
return errors.Trace(err)
if rc.cfg.App.CheckRequirements {
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
}
if err := rc.clusterResource(ctx, source); err != nil {
rc.taskMgr.CleanupTask(ctx)
return errors.Trace(err)
}
if err := rc.checkClusterRegion(ctx); err != nil {
return errors.Trace(err)
}
}
}
}
if rc.tidbGlue.OwnsSQLExecutor() {

if rc.tidbGlue.OwnsSQLExecutor() && rc.cfg.App.CheckRequirements {
// print check info at any time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

fmt.Print(rc.checkTemplate.Output())
if rc.cfg.App.CheckRequirements && !rc.checkTemplate.Success() {
if !rc.checkTemplate.Success() {
// if check requirements is true, return error.
if !taskExist && rc.taskMgr != nil {
rc.taskMgr.CleanupTask(ctx)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
sourceSize += size
return nil
})
err = rc.ClusterResource(ctx, sourceSize)
err = rc.clusterResource(ctx, sourceSize)
c.Assert(err, IsNil)

c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCount)
Expand Down Expand Up @@ -1835,7 +1835,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
rc := &Controller{cfg: cfg, tls: tls, taskMgr: mockTaskMetaMgr{}, checkTemplate: template}

err := rc.CheckClusterRegion(context.Background())
err := rc.checkClusterRegion(context.Background())
c.Assert(err, IsNil)
c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCnt)
c.Assert(template.Success(), Equals, ca.expectResult)
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,20 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.FileMeta.FileSize)
Copy link
Contributor

@sleepymole sleepymole Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple chunks may share a same file when we split large csv files. The correct value should be chunk.Chunk.EndOffset - chunk.Key.Offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if it started since a checkpoint, we shall calculate the total size rather than how much it does in this job.

}
}
}
if err != nil {
setError(err)
}
}(restoreWorker, engineID, engine)
} else {
for _, chunk := range engine.Chunks {
rc.status.FinishedFileSize.Add(chunk.FileMeta.FileSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
}

Expand Down