Skip to content

Commit

Permalink
ddl: remove unused code related to prev fast create impl (pingcap#55116)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Aug 1, 2024
1 parent 5c2dde8 commit da7ed5c
Show file tree
Hide file tree
Showing 21 changed files with 83 additions and 547 deletions.
6 changes: 3 additions & 3 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestAddIngestRecorder(t *testing.T) {
},
State: model.StatePublic,
}
err = m.CreateTableOrView(1, dbInfo.Name.L, tblInfo)
err = m.CreateTableOrView(1, tblInfo)
require.NoError(t, err)
})
dom, err := session.GetDomain(store)
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestIndexesKind(t *testing.T) {
},
State: model.StatePublic,
}
err = m.CreateTableOrView(1, dbInfo.Name.L, tblInfo)
err = m.CreateTableOrView(1, tblInfo)
require.NoError(t, err)
})
dom, err := session.GetDomain(store)
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestRewriteTableID(t *testing.T) {
},
State: model.StatePublic,
}
err = m.CreateTableOrView(1, dbInfo.Name.L, tblInfo)
err = m.CreateTableOrView(1, tblInfo)
require.NoError(t, err)
})
dom, err := session.GetDomain(store)
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newTableRestore(t *testing.T,
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbInfo.ID, db, ti.Core)
return m.CreateTableOrView(dbInfo.ID, ti.Core)
})
require.NoError(t, err)

Expand Down
45 changes: 2 additions & 43 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@ package ddl
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
Expand Down Expand Up @@ -86,7 +83,6 @@ const (

reorgWorkerCnt = 10
generalWorkerCnt = 10
localWorkerCnt = 10

// checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list.
checkFlagIndexInJobArgs = 1
Expand Down Expand Up @@ -253,16 +249,12 @@ type ddl struct {
sessPool *sess.Pool
delRangeMgr delRangeManager
enableTiFlashPoll *atomicutil.Bool
// used in the concurrency ddl.
localWorkerPool *workerPool
// get notification if any DDL job submitted or finished.
ddlJobNotifyCh chan struct{}
sysTblMgr systable.Manager
minJobIDRefresher *systable.MinJobIDRefresher

// localJobCh is used to delivery job in local TiDB nodes.
localJobCh chan *JobWrapper
// globalIDLocal locks global id to reduce write conflict.
// globalIDLock locks global id to reduce write conflict.
globalIDLock sync.Mutex
executor *executor
}
Expand Down Expand Up @@ -681,7 +673,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
limitJobCh: make(chan *JobWrapper, batchAddingJobs),
enableTiFlashPoll: atomicutil.NewBool(true),
ddlJobNotifyCh: make(chan struct{}, 100),
localJobCh: make(chan *JobWrapper, 1),
}

taskexecutor.RegisterTaskType(proto.Backfill,
Expand Down Expand Up @@ -744,33 +735,6 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
return delRangeMgr
}

func (d *ddl) prepareLocalModeWorkers() {
var idAllocator atomic.Uint64
workerFactory := func(tp workerType) func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx)
sessForJob, err := d.sessPool.Get()
if err != nil {
return nil, err
}
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
wk.seqAllocator = &idAllocator
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
return wk, nil
}
}
// local worker count at least 2 at most 10.
localCnt := min(max(runtime.GOMAXPROCS(0)/4, 2), localWorkerCnt)
d.localWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(localWorker), localCnt, localCnt, 0), jobTypeLocal)
failpoint.Inject("NoDDLDispatchLoop", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return()
}
})
d.wg.Run(d.startLocalWorkerLoop)
}

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))
Expand All @@ -780,7 +744,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.sysTblMgr = systable.NewManager(d.sessPool)
d.minJobIDRefresher = systable.NewMinJobIDRefresher(d.sysTblMgr)
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobCh, d.addBatchDDLJobsV1)
d.limitDDLJobs()
})
d.wg.Run(func() {
d.minJobIDRefresher.Start(d.ctx)
Expand All @@ -796,8 +760,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
ddl: d,
})

d.prepareLocalModeWorkers()

if config.TableLockEnabled() {
d.wg.Add(1)
go d.startCleanDeadTableLock()
Expand Down Expand Up @@ -892,9 +854,6 @@ func (d *ddl) close() {
d.wg.Wait()
d.ownerManager.Cancel()
d.schemaSyncer.Close()
if d.localWorkerPool != nil {
d.localWorkerPool.close()
}

// d.delRangeMgr using sessions from d.sessPool.
// Put it before d.sessPool.close to reduce the time spent by d.sessPool.close.
Expand Down
35 changes: 8 additions & 27 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10057,10 +10057,11 @@ func (e *executor) delJobDoneCh(jobID int64) {
e.ddlJobDoneChMap.Delete(jobID)
}

func (d *ddl) limitDDLJobs(ch chan *JobWrapper, handler func([]*JobWrapper)) {
func (d *ddl) limitDDLJobs() {
defer util.Recover(metrics.LabelDDL, "limitDDLJobs", nil, true)

jobWs := make([]*JobWrapper, 0, batchAddingJobs)
ch := d.limitJobCh
for {
select {
// the channel is never closed
Expand All @@ -10072,7 +10073,7 @@ func (d *ddl) limitDDLJobs(ch chan *JobWrapper, handler func([]*JobWrapper)) {
for i := 0; i < jobLen; i++ {
jobWs = append(jobWs, <-ch)
}
handler(jobWs)
d.addBatchDDLJobs(jobWs)
case <-d.ctx.Done():
return
}
Expand Down Expand Up @@ -10115,8 +10116,8 @@ func (e *executor) deliverJobTask(task *JobWrapper) {
e.limitJobCh <- task
}

// addBatchDDLJobsV1 gets global job IDs and puts the DDL jobs in the DDL queue.
func (d *ddl) addBatchDDLJobsV1(jobWs []*JobWrapper) {
// addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL queue.
func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) {
startTime := time.Now()
var (
err error
Expand All @@ -10134,7 +10135,7 @@ func (d *ddl) addBatchDDLJobsV1(jobWs []*JobWrapper) {
jobWs = newWs
}
}
err = d.addBatchDDLJobs(jobWs)
err = d.addBatchDDLJobs2Table(jobWs)
} else {
err = d.addBatchDDLJobs2Queue(jobWs)
}
Expand All @@ -10159,24 +10160,6 @@ func (d *ddl) addBatchDDLJobsV1(jobWs []*JobWrapper) {
}
}

// addBatchLocalDDLJobs gets global job IDs and delivery the DDL jobs to local TiDB
func (d *ddl) addBatchLocalDDLJobs(jobWs []*JobWrapper) {
if newJobWs, err := mergeCreateTableJobs(jobWs); err == nil {
jobWs = newJobWs
}
err := d.addBatchDDLJobs(jobWs)
if err != nil {
for _, jobW := range jobWs {
jobW.NotifyResult(err)
}
logutil.DDLLogger().Error("add DDL jobs failed", zap.Bool("local_mode", true), zap.Error(err))
} else {
logutil.DDLLogger().Info("add DDL jobs",
zap.Bool("local_mode", true),
zap.Int("batch count", len(jobWs)))
}
}

func (d *ddl) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// lock to reduce conflict
Expand Down Expand Up @@ -10234,8 +10217,8 @@ func (*ddl) checkFlashbackJobInQueue(t *meta.Meta) error {
return nil
}

// addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL job table or local worker.
func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error {
// addBatchDDLJobs2Table gets global job IDs and puts the DDL jobs in the DDL job table.
func (d *ddl) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {
var err error

if len(jobWs) == 0 {
Expand Down Expand Up @@ -10270,7 +10253,6 @@ func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error {
}
startTS = txn.StartTS()

// for localmode, we still need to check this variable if upgrading below v6.2.
if variable.DDLForce2Queue.Load() {
if err := d.checkFlashbackJobInQueue(t); err != nil {
return err
Expand Down Expand Up @@ -10304,7 +10286,6 @@ func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error {

setJobStateToQueueing(job)

// currently doesn't support pause job in local mode.
if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
logutil.DDLUpgradingLogger().Warn("pause user DDL by system failed", zap.Stringer("job", job), zap.Error(err))
Expand Down
64 changes: 0 additions & 64 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ var (
// is a new DDL job.
addingDDLJobNotifyKey = "/tidb/ddl/add_ddl_job_general"
dispatchLoopWaitingDuration = 1 * time.Second
localWorkerWaitingDuration = 10 * time.Millisecond
schedulerLoopRetryInterval = time.Second
)

Expand All @@ -77,16 +76,13 @@ func (t jobType) String() string {
return "general"
case jobTypeReorg:
return "reorg"
case jobTypeLocal:
return "local"
}
return "unknown job type: " + strconv.Itoa(int(t))
}

const (
jobTypeGeneral jobType = iota
jobTypeReorg
jobTypeLocal
)

type ownerListener struct {
Expand Down Expand Up @@ -246,21 +242,6 @@ func (s *jobScheduler) processJobDuringUpgrade(sess *sess.Session, job *model.Jo
return true, nil
}

// startLocalWorkerLoop starts the local worker loop to run the DDL job of v2.
func (d *ddl) startLocalWorkerLoop() {
for {
select {
case <-d.ctx.Done():
return
case jobW, ok := <-d.localJobCh:
if !ok {
return
}
d.delivery2LocalWorker(d.localWorkerPool, jobW)
}
}
}

func (s *jobScheduler) scheduleLoop() {
const retryInterval = 3 * time.Second
for {
Expand Down Expand Up @@ -466,51 +447,6 @@ func (s *jobScheduler) mustReloadSchemas() {
}
}

// delivery2LocalWorker runs the DDL job of v2 in local.
// send the result to the error channels in the task.
// delivery2Localworker owns the worker, need to put it back to the pool in this function.
func (d *ddl) delivery2LocalWorker(pool *workerPool, jobW *JobWrapper) {
job := jobW.Job
wk, err := pool.get()
if err != nil {
jobW.NotifyResult(err)
return
}
for wk == nil {
select {
case <-d.ctx.Done():
return
case <-time.After(localWorkerWaitingDuration):
}
wk, err = pool.get()
if err != nil {
jobW.NotifyResult(err)
return
}
}
d.wg.Run(func() {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
defer func() {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()

for i := int64(0); i < variable.GetDDLErrorCountLimit(); i++ {
err = wk.HandleLocalDDLJob(d.ddlCtx, job)
// since local the job is not inserted into the ddl job queue, we need to add retry logic here.
if err == nil || !isRetryableError(err) {
break
}
logutil.DDLLogger().Info("handle local ddl job", zap.Int64("retry times", i), zap.Error(err))
time.Sleep(time.Second)
}
pool.put(wk)
if err != nil {
logutil.DDLLogger().Info("handle ddl job failed", zap.Error(err), zap.Stringer("job", job))
}
jobW.NotifyResult(err)
})
}

// deliveryJob deliver the job to the worker to run it asynchronously.
// the worker will run the job until it's finished, paused or another owner takes
// over and finished it.
Expand Down
Loading

0 comments on commit da7ed5c

Please sign in to comment.