Skip to content

Commit

Permalink
cherry pick pingcap#30401 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
tangenta authored and ti-srebot committed Dec 16, 2021
1 parent 90ae7cc commit c0f9066
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
d.limitJobCh <- task
// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
err := <-task.err
if err != nil {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
}

ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

Expand Down
11 changes: 10 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
return errors.Trace(err)
}
}
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
return nil
})
var jobs string
Expand All @@ -294,7 +299,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(),
metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}
logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs))
if err != nil {
logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs))
}
}

// getHistoryDDLJob gets a DDL job with job's ID from history queue.
Expand Down
26 changes: 26 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,32 @@ func (s *testDDLSuite) TestColumnError(c *C) {
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionDropColumns, []interface{}{[]model.CIStr{model.NewCIStr("c5"), model.NewCIStr("c6")}, make([]bool, 2)}, ctx, d)
}

func (s *testDDLSerialSuite) TestAddBatchJobError(c *C) {
store := testCreateStore(c, "test_add_batch_job_error")
defer func() {
err := store.Close()
c.Assert(err, IsNil)
}()
d, err := testNewDDLAndStart(
context.Background(),
WithStore(store),
WithLease(testLease),
)
c.Assert(err, IsNil)
defer func() {
err := d.Stop()
c.Assert(err, IsNil)
}()
ctx := testNewContext(d)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr", `return(true)`), IsNil)
// Test the job runner should not hang forever.
job := &model.Job{SchemaID: 1, TableID: 1}
err = d.doDDLJob(ctx, job)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "mockAddBatchDDLJobsErr")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr"), IsNil)
}

func testCheckOwner(c *C, d *ddl, expectedVal bool) {
c.Assert(d.isOwner(), Equals, expectedVal)
}
Expand Down

0 comments on commit c0f9066

Please sign in to comment.