Skip to content

Commit

Permalink
ddl: fix alter add index on virtual column bug (#7575) (#8655)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and zz-jason committed Dec 12, 2018
1 parent f3dc5ee commit 86b5ce5
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 25 deletions.
74 changes: 49 additions & 25 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -439,22 +441,22 @@ type indexRecord struct {
}

type addIndexWorker struct {
id int
ddlWorker *worker
batchCnt int
sessCtx sessionctx.Context
taskCh chan *reorgIndexTask
resultCh chan *addIndexResult
index table.Index
table table.Table
colFieldMap map[int64]*types.FieldType
closed bool
priority int
id int
ddlWorker *worker
batchCnt int
sessCtx sessionctx.Context
taskCh chan *reorgIndexTask
resultCh chan *addIndexResult
index table.Index
table table.Table
closed bool
priority int

// The following attributes are used to reduce memory allocation.
defaultVals []types.Datum
idxRecords []*indexRecord
rowMap map[int64]types.Datum
rowDecoder decoder.RowDecoder
idxKeyBufs [][]byte
batchCheckKeys []kv.Key
distinctCheckFlags []bool
Expand Down Expand Up @@ -493,8 +495,9 @@ func mergeAddIndexCtxToResult(taskCtx *addIndexTaskContext, result *addIndexResu
result.scanCount += taskCtx.scanCount
}

func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, colFieldMap map[int64]*types.FieldType) *addIndexWorker {
func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column) *addIndexWorker {
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
rowDecoder := decoder.NewRowDecoder(t.Cols(), decodeColMap)
return &addIndexWorker{
id: id,
ddlWorker: worker,
Expand All @@ -504,10 +507,10 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
resultCh: make(chan *addIndexResult, 1),
index: index,
table: t,
colFieldMap: colFieldMap,
rowDecoder: rowDecoder,
priority: kv.PriorityLow,
defaultVals: make([]types.Datum, len(t.Cols())),
rowMap: make(map[int64]types.Datum, len(colFieldMap)),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
}
}

Expand All @@ -523,7 +526,8 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor
t := w.table
cols := t.Cols()
idxInfo := w.index.Meta()
_, err := tablecodec.DecodeRowWithMap(rawRecord, w.colFieldMap, time.UTC, w.rowMap)
sysZone := timeutil.SystemLocation()
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, rawRecord, time.UTC, sysZone, w.rowMap)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -552,9 +556,8 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor

if idxColumnVal.Kind() == types.KindMysqlTime {
t := idxColumnVal.GetMysqlTime()
zone := timeutil.SystemLocation()
if t.Type == mysql.TypeTimestamp && zone != time.UTC {
err := t.ConvertTimeZone(zone, time.UTC)
if t.Type == mysql.TypeTimestamp && sysZone != time.UTC {
err := t.ConvertTimeZone(sysZone, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -851,14 +854,31 @@ func (w *addIndexWorker) run(d *ddlCtx) {
log.Infof("[ddl-reorg] worker[%v] exit", w.id)
}

func makeupIndexColFieldMap(t table.Table, indexInfo *model.IndexInfo) map[int64]*types.FieldType {
func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table, indexInfo *model.IndexInfo) (map[int64]decoder.Column, error) {
cols := t.Cols()
colFieldMap := make(map[int64]*types.FieldType, len(indexInfo.Columns))
decodeColMap := make(map[int64]decoder.Column, len(indexInfo.Columns))
for _, v := range indexInfo.Columns {
col := cols[v.Offset]
colFieldMap[col.ID] = &col.FieldType
tpExpr := decoder.Column{
Info: col.ToInfo(),
}
if col.IsGenerated() && !col.GeneratedStored {
for _, c := range cols {
if _, ok := col.Dependences[c.Name.L]; ok {
decodeColMap[c.ID] = decoder.Column{
Info: c.ToInfo(),
}
}
}
e, err := expression.ParseSimpleExprCastWithTableInfo(sessCtx, col.GeneratedExprString, t.Meta(), &col.FieldType)
if err != nil {
return nil, errors.Trace(err)
}
tpExpr.GenExpr = e
}
decodeColMap[col.ID] = tpExpr
}
return colFieldMap
return decodeColMap, nil
}

// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
Expand Down Expand Up @@ -1061,19 +1081,23 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
colFieldMap := makeupIndexColFieldMap(t, indexInfo)
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
return errors.Trace(err)
}

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, colFieldMap)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err := w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
}

Expand Down
27 changes: 27 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,33 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("ALTER TABLE t1 ADD COLUMN c4 bit(10) default 127;")
tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);")
tk.MustExec("admin check table t1;")

// For add index on virtual column
tk.MustExec("drop table if exists t1;")
tk.MustExec(`create table t1 (
a int as (JSON_EXTRACT(k,'$.a')),
c double as (JSON_EXTRACT(k,'$.c')),
d decimal(20,10) as (JSON_EXTRACT(k,'$.d')),
e char(10) as (JSON_EXTRACT(k,'$.e')),
f date as (JSON_EXTRACT(k,'$.f')),
g time as (JSON_EXTRACT(k,'$.g')),
h datetime as (JSON_EXTRACT(k,'$.h')),
i timestamp as (JSON_EXTRACT(k,'$.i')),
j year as (JSON_EXTRACT(k,'$.j')),
k json);`)

tk.MustExec("insert into t1 set k='{\"a\": 100,\"c\":1.234,\"d\":1.2340000000,\"e\":\"abcdefg\",\"f\":\"2018-09-28\",\"g\":\"12:59:59\",\"h\":\"2018-09-28 12:59:59\",\"i\":\"2018-09-28 16:40:33\",\"j\":\"2018\"}';")
tk.MustExec("alter table t1 add index idx_a(a);")
tk.MustExec("alter table t1 add index idx_c(c);")
tk.MustExec("alter table t1 add index idx_d(d);")
tk.MustExec("alter table t1 add index idx_e(e);")
tk.MustExec("alter table t1 add index idx_f(f);")
tk.MustExec("alter table t1 add index idx_g(g);")
tk.MustExec("alter table t1 add index idx_h(h);")
tk.MustExec("alter table t1 add index idx_j(j);")
tk.MustExec("alter table t1 add index idx_i(i);")
tk.MustExec("alter table t1 add index idx_m(a,c,d,e,f,g,h,i,j);")
tk.MustExec("admin check table t1;")
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
Expand Down
11 changes: 11 additions & 0 deletions expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func ParseSimpleExprWithTableInfo(ctx sessionctx.Context, exprStr string, tableI
return RewriteSimpleExprWithTableInfo(ctx, tableInfo, expr)
}

// ParseSimpleExprCastWithTableInfo parses simple expression string to Expression.
// And the expr returns will cast to the target type.
func ParseSimpleExprCastWithTableInfo(ctx sessionctx.Context, exprStr string, tableInfo *model.TableInfo, targetFt *types.FieldType) (Expression, error) {
e, err := ParseSimpleExprWithTableInfo(ctx, exprStr, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
e = BuildCastFunction(ctx, e, targetFt)
return e, nil
}

// RewriteSimpleExprWithTableInfo rewrites simple ast.ExprNode to expression.Expression.
func RewriteSimpleExprWithTableInfo(ctx sessionctx.Context, tbl *model.TableInfo, expr ast.ExprNode) (Expression, error) {
dbName := model.NewCIStr(ctx.GetSessionVars().CurrentDB)
Expand Down
112 changes: 112 additions & 0 deletions util/rowDecoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package decoder

import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

// Column contains the info and generated expr of column.
type Column struct {
Info *model.ColumnInfo
GenExpr expression.Expression
}

// RowDecoder decodes a byte slice into datums and eval the generated column value.
type RowDecoder struct {
mutRow chunk.MutRow
columns map[int64]Column
colTypes map[int64]*types.FieldType
haveGenColumn bool
}

// NewRowDecoder returns a new RowDecoder.
func NewRowDecoder(cols []*table.Column, decodeColMap map[int64]Column) RowDecoder {
colFieldMap := make(map[int64]*types.FieldType, len(decodeColMap))
haveGenCol := false
for id, col := range decodeColMap {
colFieldMap[id] = &col.Info.FieldType
if col.GenExpr != nil {
haveGenCol = true
}
}
if !haveGenCol {
return RowDecoder{
colTypes: colFieldMap,
}
}

tps := make([]*types.FieldType, len(cols))
for _, col := range cols {
tps[col.Offset] = &col.FieldType
}
return RowDecoder{
mutRow: chunk.MutRowFromTypes(tps),
columns: decodeColMap,
colTypes: colFieldMap,
haveGenColumn: haveGenCol,
}
}

// DecodeAndEvalRowWithMap decodes a byte slice into datums and evaluates the generated column value.
func (rd RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, decodeLoc, sysLoc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
row, err := tablecodec.DecodeRowWithMap(b, rd.colTypes, decodeLoc, row)
if err != nil {
return nil, errors.Trace(err)
}
if !rd.haveGenColumn {
return row, nil
}

for id, v := range row {
rd.mutRow.SetValue(rd.columns[id].Info.Offset, v.GetValue())
}
for id, col := range rd.columns {
if col.GenExpr == nil {
continue
}
// Eval the column value
val, err := col.GenExpr.Eval(rd.mutRow.ToRow())
if err != nil {
return nil, errors.Trace(err)
}
val, err = table.CastValue(ctx, val, col.Info)
if err != nil {
return nil, errors.Trace(err)
}

if val.Kind() == types.KindMysqlTime {
t := val.GetMysqlTime()
if t.Type == mysql.TypeTimestamp && sysLoc != time.UTC {
err := t.ConvertTimeZone(sysLoc, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
val.SetMysqlTime(t)
}
}
row[id] = val
}
return row, nil
}

0 comments on commit 86b5ce5

Please sign in to comment.