Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: wjhuang2016 <huangwenjun1997@gmail.com>
  • Loading branch information
wjhuang2016 committed Apr 20, 2023
2 parents 0e64a68 + eb77d39 commit df6c7b4
Show file tree
Hide file tree
Showing 33 changed files with 532 additions and 121 deletions.
16 changes: 11 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,29 @@ else
CGO_ENABLED=1 $(GOBUILD) -gcflags="all=-N -l" $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o '$(TARGET)' ./tidb-server
endif

init-submodule:
git submodule init && git submodule update --force

enterprise-prepare:
git submodule init && git submodule update && cd extension/enterprise/generate && $(GO) generate -run genfile main.go
cd extension/enterprise/generate && $(GO) generate -run genfile main.go

enterprise-clear:
cd extension/enterprise/generate && $(GO) generate -run clear main.go

enterprise-docker: enterprise-prepare
enterprise-docker: init-submodule enterprise-prepare
docker build -t "$(DOCKERPREFIX)tidb:latest" --build-arg 'GOPROXY=$(shell go env GOPROXY),' -f Dockerfile.enterprise .

enterprise-server-build:
ifeq ($(TARGET), "")
CGO_ENABLED=1 $(GOBUILD) -tags enterprise $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o bin/tidb-server tidb-server/main.go
CGO_ENABLED=1 $(GOBUILD) -tags enterprise $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG) $(EXTENSION_FLAG)' -o bin/tidb-server tidb-server/main.go
else
CGO_ENABLED=1 $(GOBUILD) -tags enterprise $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o '$(TARGET)' tidb-server/main.go
CGO_ENABLED=1 $(GOBUILD) -tags enterprise $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG) $(EXTENSION_FLAG)' -o '$(TARGET)' tidb-server/main.go
endif

enterprise-server: enterprise-prepare enterprise-server-build
enterprise-server:
@make init-submodule
@make enterprise-prepare
@make enterprise-server-build

server_check:
ifeq ($(TARGET), "")
Expand Down
5 changes: 5 additions & 0 deletions Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ LDFLAGS += -X "github.com/pingcap/tidb/util/versioninfo.TiDBGitHash=$(shell git
LDFLAGS += -X "github.com/pingcap/tidb/util/versioninfo.TiDBGitBranch=$(shell git rev-parse --abbrev-ref HEAD)"
LDFLAGS += -X "github.com/pingcap/tidb/util/versioninfo.TiDBEdition=$(TIDB_EDITION)"

EXTENSION_FLAG =
ifeq ($(shell if [ -a extension/enterprise/.git ]; then echo "true"; fi),true)
EXTENSION_FLAG += -X "github.com/pingcap/tidb/util/versioninfo.TiDBEnterpriseExtensionGitHash=$(shell cd extension/enterprise && git rev-parse HEAD)"
endif

TEST_LDFLAGS = -X "github.com/pingcap/tidb/config.checkBeforeDropLDFlag=1"
COVERAGE_SERVER_LDFLAGS = -X "github.com/pingcap/tidb/tidb-server.isCoverageServer=1"

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_xitongsys_parquet_go//reader",
"@com_github_xitongsys_parquet_go//source",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_golang_x_text//encoding",
"@org_golang_x_text//encoding/charmap",
"@org_golang_x_text//encoding/simplifiedchinese",
Expand Down
121 changes: 48 additions & 73 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -217,84 +218,53 @@ func MakeTableRegions(

start := time.Now()

execCtx, cancel := context.WithCancel(ctx)
defer cancel()

concurrency := mathutil.Max(cfg.Concurrency, 2)
fileChan := make(chan FileInfo, concurrency)
resultChan := make(chan fileRegionRes, concurrency)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for info := range fileChan {
var (
regions []*TableRegion
sizes []float64
err error
)
dataFileSize := info.FileMeta.FileSize
if info.FileMeta.Type == SourceTypeParquet {
regions, sizes, err = makeParquetFileRegion(ctx, cfg, info)
} else if info.FileMeta.Type == SourceTypeCSV && cfg.StrictFormat &&
info.FileMeta.Compression == CompressionNone &&
dataFileSize > cfg.MaxChunkSize+cfg.MaxChunkSize/largeCSVLowerThresholdRation {
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
regions, sizes, err = SplitLargeCSV(ctx, cfg, info)
} else {
regions, sizes, err = MakeSourceFileRegion(execCtx, cfg, info)
}
select {
case resultChan <- fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}:
case <-ctx.Done():
return
}
if err != nil {
log.FromContext(ctx).Error("make source file region error", zap.Error(err), zap.String("file_path", info.FileMeta.Path))
break
}
}
}()
}
var fileRegionsMap sync.Map

go func() {
wg.Wait()
close(resultChan)
}()

errChan := make(chan error, 1)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
meta := cfg.TableMeta
fileRegionsMap := make(map[string]fileRegionRes, len(meta.DataFiles))
go func() {
for res := range resultChan {
if res.err != nil {
errChan <- res.err
return
for _, info := range meta.DataFiles {
info := info
eg.Go(func() error {
select {
case <-egCtx.Done():
return nil
default:
}
fileRegionsMap[res.info.FileMeta.Path] = res
}
errChan <- nil
}()

for _, dataFile := range meta.DataFiles {
select {
case fileChan <- dataFile:
case <-ctx.Done():
close(fileChan)
return nil, ctx.Err()
case err := <-errChan:
return nil, err
}
var (
regions []*TableRegion
sizes []float64
err error
)
dataFileSize := info.FileMeta.FileSize
if info.FileMeta.Type == SourceTypeParquet {
regions, sizes, err = makeParquetFileRegion(egCtx, cfg, info)
} else if info.FileMeta.Type == SourceTypeCSV && cfg.StrictFormat &&
info.FileMeta.Compression == CompressionNone &&
dataFileSize > cfg.MaxChunkSize+cfg.MaxChunkSize/largeCSVLowerThresholdRation {
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
regions, sizes, err = SplitLargeCSV(egCtx, cfg, info)
} else {
regions, sizes, err = MakeSourceFileRegion(egCtx, cfg, info)
}
if err != nil {
log.FromContext(egCtx).Error("make source file region error", zap.Error(err), zap.String("file_path", info.FileMeta.Path))
return err
}
result := fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}
fileRegionsMap.Store(info.FileMeta.Path, result)
return nil
})
}
close(fileChan)
err := <-errChan
if err != nil {

if err := eg.Wait(); err != nil {
return nil, err
}

Expand All @@ -303,7 +273,12 @@ func MakeTableRegions(
// rebase row-id for all chunk
rowIDBase := int64(0)
for _, dataFile := range meta.DataFiles {
fileRegionsRes := fileRegionsMap[dataFile.FileMeta.Path]
v, ok := fileRegionsMap.Load(dataFile.FileMeta.Path)
if !ok {
return nil, errors.Errorf("file %s not found in MakeTableRegions", dataFile.FileMeta.Path)
}
//nolint: forcetypeassert
fileRegionsRes := v.(fileRegionRes)
for _, region := range fileRegionsRes.regions {
region.Chunk.PrevRowIDMax += rowIDBase
region.Chunk.RowIDMax += rowIDBase
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ func TestMakeTableRegionsSplitLargeFile(t *testing.T) {
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset)
assert.Len(t, regions[0].Chunk.Columns, 0)

// test canceled context will not panic
ctx, cancel := context.WithCancel(context.Background())
cancel()
for i := 0; i < 20; i++ {
_, _ = MakeTableRegions(ctx, divideConfig)
}
}

func TestCompressedMakeSourceFileRegion(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions build/print-workspace-status.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ TiDB_BUILD_UTCTIME=$(date -u '+%Y-%m-%d %H:%M:%S')
TIDB_GIT_HASH=$(git rev-parse HEAD)
TIDB_GIT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TIDB_EDITION=${TIDB_EDITION:-Community}
TIDB_ENTERPRISE_EXTENSION_GIT_HASH=""
if [ -a "extension/enterprise/.git" ]; then
TIDB_ENTERPRISE_EXTENSION_GIT_HASH=$(cd extension/enterprise && git rev-parse HEAD)
fi

cat <<EOF
STABLE_TiDB_RELEASE_VERSION ${TiDB_RELEASE_VERSION}
STABLE_TiDB_BUILD_UTCTIME ${TiDB_BUILD_UTCTIME}
STABLE_TIDB_GIT_HASH ${TIDB_GIT_HASH}
STABLE_TIDB_GIT_BRANCH ${TIDB_GIT_BRANCH}
STABLE_TIDB_EDITION ${TIDB_EDITION}
STABLE_TIDB_ENTERPRISE_EXTENSION_GIT_HASH ${TIDB_ENTERPRISE_EXTENSION_GIT_HASH}
EOF
2 changes: 0 additions & 2 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
return nil
}

const importThreshold = 0.85

// Flush checks the disk quota and imports the current key-values in engine to the storage.
func (bc *litBackendCtx) Flush(indexID int64, force bool) (flushed, imported bool, err error) {
ei, exist := bc.Load(indexID)
Expand Down
5 changes: 5 additions & 0 deletions ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ func (b *backfillSchedulerHandle) SplitSubtask(_ context.Context, subtask []byte
return nil, consumer.getResult()
}

// OnSubtaskFinished implements the Scheduler interface.
func (*backfillSchedulerHandle) OnSubtaskFinished(context.Context, []byte) error {
return nil
}

// CleanupSubtaskExecEnv implements the Scheduler interface.
func (b *backfillSchedulerHandle) CleanupSubtaskExecEnv(context.Context) error {
logutil.BgLogger().Info("[ddl] lightning cleanup subtask exec env")
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (t *testScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto
}, nil
}

func (t *testScheduler) OnSubtaskFinished(_ context.Context, _ []byte) error {
return nil
}

type testSubtaskExecutor struct {
v *atomic.Int64
}
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Scheduler interface {
SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error)
// CleanupSubtaskExecEnv is used to clean up the environment for the subtask executor.
CleanupSubtaskExecEnv(context.Context) error
// OnSubtaskFinished is used to handle the subtask when it is finished.
OnSubtaskFinished(ctx context.Context, subtask []byte) error
// Rollback is used to rollback all subtasks.
Rollback(context.Context) error
}
Expand Down
10 changes: 8 additions & 2 deletions disttask/framework/scheduler/interface_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,20 @@ func (m *MockScheduler) InitSubtaskExecEnv(ctx context.Context) error {
}

// SplitSubtask implements Scheduler.SplitSubtask.
func (m *MockScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto.MinimalTask, error) {
args := m.Called(subtask)
func (m *MockScheduler) SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error) {
args := m.Called(ctx, subtask)
if args.Error(1) != nil {
return nil, args.Error(1)
}
return args.Get(0).([]proto.MinimalTask), nil
}

// OnSubtaskFinished implements Scheduler.OnSubtaskFinished.
func (m *MockScheduler) OnSubtaskFinished(ctx context.Context, subtask []byte) error {
args := m.Called(ctx, subtask)
return args.Error(0)
}

// CleanupSubtaskExecEnv implements Scheduler.CleanupSubtaskExecEnv.
func (m *MockScheduler) CleanupSubtaskExecEnv(ctx context.Context) error {
args := m.Called(ctx)
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {
func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {
tasks = m.filterAlreadyHandlingTasks(tasks)
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onRunnableTasks", zap.Any("task", task))
if _, ok := m.subtaskExecutorPools[task.Type]; !ok {
logutil.Logger(m.logCtx).Error("unknown task type", zap.String("type", task.Type))
continue
}
exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, proto.TaskStatePending, proto.TaskStateRevertPending)
if err != nil {
logutil.Logger(m.logCtx).Error("check subtask exist failed", zap.Error(err))
m.onError(err)
continue
}
Expand Down
7 changes: 6 additions & 1 deletion disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
}

var minimalTasks []proto.MinimalTask
minimalTasks, err = scheduler.SplitSubtask(context.Background(), subtask.Meta)
minimalTasks, err = scheduler.SplitSubtask(runCtx, subtask.Meta)
if err != nil {
s.onError(err)
if errors.Cause(err) == context.Canceled {
Expand All @@ -160,6 +160,11 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
}
}
minimalTaskWg.Wait()
if err := s.getError(); err == nil {
if err := scheduler.OnSubtaskFinished(runCtx, subtask.Meta); err != nil {
s.onError(err)
}
}
if err := s.getError(); err != nil {
if errors.Cause(err) == context.Canceled {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, "")
Expand Down
11 changes: 6 additions & 5 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
err = scheduler.Run(runCtx, &proto.Task{Type: tp, ID: taskID, Concurrency: concurrency})
Expand All @@ -105,7 +105,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand All @@ -117,8 +117,9 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateSucceed).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(nil, nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand All @@ -130,7 +131,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(context.Canceled).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateCanceled).Return(nil).Once()
Expand Down Expand Up @@ -244,7 +245,7 @@ func TestScheduler(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand Down
6 changes: 6 additions & 0 deletions disttask/loaddata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ func (s *ImportScheduler) SplitSubtask(ctx context.Context, bs []byte) ([]proto.
return miniTask, nil
}

// OnSubtaskFinished implements the Scheduler.OnSubtaskFinished interface.
func (s *ImportScheduler) OnSubtaskFinished(context.Context, []byte) error {
logutil.BgLogger().Info("OnSubtaskFinished", zap.Any("taskMeta", s.taskMeta))
return nil
}

// CleanupSubtaskExecEnv implements the Scheduler.CleanupSubtaskExecEnv interface.
func (s *ImportScheduler) CleanupSubtaskExecEnv(ctx context.Context) (err error) {
defer func() {
Expand Down
Loading

0 comments on commit df6c7b4

Please sign in to comment.