Skip to content

Commit

Permalink
infoschema/executor : add DDLJobs sys table (#14837)
Browse files Browse the repository at this point in the history
  • Loading branch information
reafans authored Mar 19, 2020
1 parent 3ced6bc commit 86d8e16
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 55 deletions.
5 changes: 5 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,11 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
},
}
case strings.ToLower(infoschema.TableDDLJobs):
return &DDLJobsReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
is: b.is,
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
132 changes: 79 additions & 53 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -329,14 +332,79 @@ func (e *ShowDDLExec) Next(ctx context.Context, req *chunk.Chunk) error {
// ShowDDLJobsExec represent a show DDL jobs executor.
type ShowDDLJobsExec struct {
baseExecutor
DDLJobRetriever

cursor int
jobNumber int
is infoschema.InfoSchema
done bool
}

// DDLJobRetriever retrieve the DDLJobs.
type DDLJobRetriever struct {
runningJobs []*model.Job
historyJobIter *meta.LastJobIterator
cacheJobs []*model.Job
jobNumber int
cursor int
is infoschema.InfoSchema
done bool
activeRoles []*auth.RoleIdentity
cacheJobs []*model.Job
}

func (e *DDLJobRetriever) initial(txn kv.Transaction) error {
jobs, err := admin.GetDDLJobs(txn)
if err != nil {
return err
}
m := meta.NewMeta(txn)
e.historyJobIter, err = m.GetLastHistoryDDLJobsIterator()
if err != nil {
return err
}
e.runningJobs = jobs
e.cursor = 0
return nil
}

func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, checker privilege.Manager) {
schemaName := job.SchemaName
tableName := ""
finishTS := uint64(0)
if job.BinlogInfo != nil {
finishTS = job.BinlogInfo.FinishedTS
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.L
}
if len(schemaName) == 0 && job.BinlogInfo.DBInfo != nil {
schemaName = job.BinlogInfo.DBInfo.Name.L
}
}
// For compatibility, the old version of DDL Job wasn't store the schema name and table name.
if len(schemaName) == 0 {
schemaName = getSchemaName(e.is, job.SchemaID)
}
if len(tableName) == 0 {
tableName = getTableName(e.is, job.TableID)
}

// Check the privilege.
if checker != nil && !checker.RequestVerification(e.activeRoles, strings.ToLower(schemaName), strings.ToLower(tableName), "", mysql.AllPrivMask) {
return
}

req.AppendInt64(0, job.ID)
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, job.Type.String())
req.AppendString(4, job.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
req.AppendInt64(7, job.RowCount)
req.AppendString(8, model.TSConvert2Time(job.StartTS).String())
if finishTS > 0 {
req.AppendString(9, model.TSConvert2Time(finishTS).String())
} else {
req.AppendString(9, "")
}
req.AppendString(10, job.State.String())
}

// ShowDDLJobQueriesExec represents a show DDL job queries executor.
Expand Down Expand Up @@ -404,21 +472,14 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error {
if err != nil {
return err
}
jobs, err := admin.GetDDLJobs(txn)
if err != nil {
return err
}
e.DDLJobRetriever.is = e.is
if e.jobNumber == 0 {
e.jobNumber = admin.DefNumHistoryJobs
}

m := meta.NewMeta(txn)
e.historyJobIter, err = m.GetLastHistoryDDLJobsIterator()
err = e.DDLJobRetriever.initial(txn)
if err != nil {
return err
}
e.runningJobs = append(e.runningJobs, jobs...)
e.cursor = 0
return nil
}

Expand All @@ -429,17 +490,19 @@ func (e *ShowDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}
count := 0

// Append running ddl jobs.
if e.cursor < len(e.runningJobs) {
numCurBatch := mathutil.Min(req.Capacity(), len(e.runningJobs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
e.appendJobToChunk(req, e.runningJobs[i])
e.appendJobToChunk(req, e.runningJobs[i], nil)
}
e.cursor += numCurBatch
count += numCurBatch
}
var err error

// Append history ddl jobs.
var err error
if count < req.Capacity() {
num := req.Capacity() - count
remainNum := e.jobNumber - (e.cursor - len(e.runningJobs))
Expand All @@ -449,50 +512,13 @@ func (e *ShowDDLJobsExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
for _, job := range e.cacheJobs {
e.appendJobToChunk(req, job)
e.appendJobToChunk(req, job, nil)
}
e.cursor += len(e.cacheJobs)
}
return nil
}

func (e *ShowDDLJobsExec) appendJobToChunk(req *chunk.Chunk, job *model.Job) {
req.AppendInt64(0, job.ID)
schemaName := job.SchemaName
tableName := ""
finishTS := uint64(0)
if job.BinlogInfo != nil {
finishTS = job.BinlogInfo.FinishedTS
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.L
}
if len(schemaName) == 0 && job.BinlogInfo.DBInfo != nil {
schemaName = job.BinlogInfo.DBInfo.Name.L
}
}
// For compatibility, the old version of DDL Job wasn't store the schema name and table name.
if len(schemaName) == 0 {
schemaName = getSchemaName(e.is, job.SchemaID)
}
if len(tableName) == 0 {
tableName = getTableName(e.is, job.TableID)
}
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, job.Type.String())
req.AppendString(4, job.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
req.AppendInt64(7, job.RowCount)
req.AppendString(8, model.TSConvert2Time(job.StartTS).String())
if finishTS > 0 {
req.AppendString(9, model.TSConvert2Time(finishTS).String())
} else {
req.AppendString(9, "")
}
req.AppendString(10, job.State.String())
}

func getSchemaName(is infoschema.InfoSchema, id int64) string {
var schemaName string
DBInfo, ok := is.SchemaByID(id)
Expand Down
62 changes: 62 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -825,6 +827,66 @@ func (e *memtableRetriever) setDataFromViews(ctx sessionctx.Context, schemas []*
e.rows = rows
}

// DDLJobsReaderExec executes DDLJobs information retrieving.
type DDLJobsReaderExec struct {
baseExecutor
DDLJobRetriever

cacheJobs []*model.Job
is infoschema.InfoSchema
}

// Open implements the Executor Next interface.
func (e *DDLJobsReaderExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
e.DDLJobRetriever.is = e.is
e.activeRoles = e.ctx.GetSessionVars().ActiveRoles
err = e.DDLJobRetriever.initial(txn)
if err != nil {
return err
}
return nil
}

// Next implements the Executor Next interface.
func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
checker := privilege.GetPrivilegeManager(e.ctx)
count := 0

// Append running DDL jobs.
if e.cursor < len(e.runningJobs) {
num := mathutil.Min(req.Capacity(), len(e.runningJobs)-e.cursor)
for i := e.cursor; i < e.cursor+num; i++ {
e.appendJobToChunk(req, e.runningJobs[i], checker)
req.AppendString(11, e.runningJobs[i].Query)
}
e.cursor += num
count += num
}
var err error

// Append history DDL jobs.
if count < req.Capacity() {
e.cacheJobs, err = e.historyJobIter.GetLastJobs(req.Capacity()-count, e.cacheJobs)
if err != nil {
return err
}
for _, job := range e.cacheJobs {
e.appendJobToChunk(req, job, checker)
req.AppendString(11, job.Query)
}
e.cursor += len(e.cacheJobs)
}
return nil
}

func (e *memtableRetriever) setDataFromEngines() {
var rows [][]types.Datum
rows = append(rows,
Expand Down
38 changes: 36 additions & 2 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
tk.MustQuery("select * from information_schema.SCHEMATA where schema_name='mysql';").Check(
testkit.Rows("def mysql utf8mb4 utf8mb4_bin <nil>"))

//test the privilege of new user for information_schema.schemata
// Test the privilege of new user for information_schema.schemata.
tk.MustExec("create user schemata_tester")
schemataTester := testkit.NewTestKit(c, s.store)
schemataTester.MustExec("use information_schema")
Expand All @@ -187,7 +187,7 @@ func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
schemataTester.MustQuery("select * from information_schema.SCHEMATA where schema_name='INFORMATION_SCHEMA';").Check(
testkit.Rows("def INFORMATION_SCHEMA utf8mb4 utf8mb4_bin <nil>"))

//test the privilege of user with privilege of mysql for information_schema.schemata
// Test the privilege of user with privilege of mysql for information_schema.schemata.
tk.MustExec("CREATE ROLE r_mysql_priv;")
tk.MustExec("GRANT ALL PRIVILEGES ON mysql.* TO r_mysql_priv;")
tk.MustExec("GRANT r_mysql_priv TO schemata_tester;")
Expand Down Expand Up @@ -243,6 +243,40 @@ func (s *testInfoschemaTableSuite) TestCharacterSetCollations(c *C) {
testkit.Rows("utf8mb4_bin utf8mb4"))
}

func (s *testInfoschemaTableSuite) TestDDLJobs(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_ddl_jobs")
tk.MustQuery("select db_name, job_type from information_schema.DDL_JOBS limit 1").Check(
testkit.Rows("test_ddl_jobs create schema"))

tk.MustExec("use test_ddl_jobs")
tk.MustExec("create table t (a int);")
tk.MustQuery("select db_name, table_name, job_type from information_schema.DDL_JOBS where table_name = 't'").Check(
testkit.Rows("test_ddl_jobs t create table"))

tk.MustQuery("select job_type from information_schema.DDL_JOBS group by job_type having job_type = 'create table'").Check(
testkit.Rows("create table"))

// Test the privilege of new user for information_schema.DDL_JOBS.
tk.MustExec("create user DDL_JOBS_tester")
DDLJobsTester := testkit.NewTestKit(c, s.store)
DDLJobsTester.MustExec("use information_schema")
c.Assert(DDLJobsTester.Se.Auth(&auth.UserIdentity{
Username: "DDL_JOBS_tester",
Hostname: "127.0.0.1",
}, nil, nil), IsTrue)

// Test the privilege of user for information_schema.ddl_jobs.
DDLJobsTester.MustQuery("select DB_NAME, TABLE_NAME from information_schema.DDL_JOBS where DB_NAME = 'test_ddl_jobs' and TABLE_NAME = 't';").Check(
[][]interface{}{})
tk.MustExec("CREATE ROLE r_priv;")
tk.MustExec("GRANT ALL PRIVILEGES ON test_ddl_jobs.* TO r_priv;")
tk.MustExec("GRANT r_priv TO DDL_JOBS_tester;")
DDLJobsTester.MustExec("set role r_priv")
DDLJobsTester.MustQuery("select DB_NAME, TABLE_NAME from information_schema.DDL_JOBS where DB_NAME = 'test_ddl_jobs' and TABLE_NAME = 't';").Check(
testkit.Rows("test_ddl_jobs t"))
}

func (s *testInfoschemaTableSuite) TestKeyColumnUsage(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
Loading

0 comments on commit 86d8e16

Please sign in to comment.