Skip to content

Commit

Permalink
*: make chunk.SwapColumn private (#57274)
Browse files Browse the repository at this point in the history
close #55885
  • Loading branch information
windtalker authored Nov 13, 2024
1 parent eea72ec commit 65281ad
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 179 deletions.
7 changes: 5 additions & 2 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,12 +811,15 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) exec.Executor
end: v.Offset + v.Count,
}

childUsedSchemaLen := v.Children()[0].Schema().Len()
childSchemaLen := v.Children()[0].Schema().Len()
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, childUsedSchema...)
if len(e.columnIdxsUsedByChild) == childUsedSchemaLen {
if len(e.columnIdxsUsedByChild) == childSchemaLen {
e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition.
} else {
// construct a project evaluator to do the inline projection
e.columnSwapHelper = chunk.NewColumnSwapHelper(e.columnIdxsUsedByChild)
}
return e
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ type LimitExec struct {

// columnIdxsUsedByChild keep column indexes of child executor used for inline projection
columnIdxsUsedByChild []int
columnSwapHelper *chunk.ColumnSwapHelper

// Log the close time when opentracing is enabled.
span opentracing.Span
Expand Down Expand Up @@ -480,10 +481,9 @@ func (e *LimitExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.cursor += batchSize

if e.columnIdxsUsedByChild != nil {
for i, childIdx := range e.columnIdxsUsedByChild {
if err = req.SwapColumn(i, e.childResult, childIdx); err != nil {
return err
}
err = e.columnSwapHelper.SwapColumns(e.childResult, req)
if err != nil {
return err
}
} else {
req.SwapColumns(e.childResult)
Expand Down
133 changes: 9 additions & 124 deletions pkg/expression/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,127 +15,10 @@
package expression

import (
"sync/atomic"

"github.com/pingcap/tidb/pkg/expression/exprctx"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/disjointset"
"github.com/pingcap/tidb/pkg/util/intest"
)

type columnEvaluator struct {
inputIdxToOutputIdxes map[int][]int
// mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk.
mergedInputIdxToOutputIdxes atomic.Pointer[map[int][]int]
}

// run evaluates "Column" expressions.
// NOTE: It should be called after all the other expressions are evaluated
//
// since it will change the content of the input Chunk.
func (e *columnEvaluator) run(ctx EvalContext, input, output *chunk.Chunk) error {
// mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure.
if e.mergedInputIdxToOutputIdxes.Load() == nil {
e.mergeInputIdxToOutputIdxes(input, e.inputIdxToOutputIdxes)
}
for inputIdx, outputIdxes := range *e.mergedInputIdxToOutputIdxes.Load() {
if err := output.SwapColumn(outputIdxes[0], input, inputIdx); err != nil {
return err
}
for i, length := 1, len(outputIdxes); i < length; i++ {
output.MakeRef(outputIdxes[0], outputIdxes[i])
}
}
return nil
}

// mergeInputIdxToOutputIdxes merges separate inputIdxToOutputIdxes entries when column references
// are detected within the input chunk. This process ensures consistent handling of columns derived
// from the same original source.
//
// Consider the following scenario:
//
// Initial scan operation produces a column 'a':
//
// scan: a (addr: ???)
//
// This column 'a' is used in the first projection (proj1) to create two columns a1 and a2, both referencing 'a':
//
// proj1
// / \
// / \
// / \
// a1 (addr: 0xe) a2 (addr: 0xe)
// / \
// / \
// / \
// proj2 proj2
// / \ / \
// / \ / \
// a3 a4 a5 a6
//
// (addr: 0xe) (addr: 0xe) (addr: 0xe) (addr: 0xe)
//
// Here, a1 and a2 share the same address (0xe), indicating they reference the same data from the original 'a'.
//
// When moving to the second projection (proj2), the system tries to project these columns further:
// - The first set (left side) consists of a3 and a4, derived from a1, both retaining the address (0xe).
// - The second set (right side) consists of a5 and a6, derived from a2, also starting with address (0xe).
//
// When proj1 is complete, the output chunk contains two columns [a1, a2], both derived from the single column 'a' from the scan.
// Since both a1 and a2 are column references with the same address (0xe), they are treated as referencing the same data.
//
// In proj2, two separate <inputIdx, []outputIdxes> items are created:
// - <0, [0,1]>: This means the 0th input column (a1) is projected twice, into the 0th and 1st columns of the output chunk.
// - <1, [2,3]>: This means the 1st input column (a2) is projected twice, into the 2nd and 3rd columns of the output chunk.
//
// Due to the column swapping logic in each projection, after applying the <0, [0,1]> projection,
// the addresses for a1 and a2 may become swapped or invalid:
//
// proj1: a1 (addr: invalid) a2 (addr: invalid)
//
// This can lead to issues in proj2, where further operations on these columns may be unsafe:
//
// proj2: a3 (addr: 0xe) a4 (addr: 0xe) a5 (addr: ???) a6 (addr: ???)
//
// Therefore, it's crucial to identify and merge the original column references early, ensuring
// the final inputIdxToOutputIdxes mapping accurately reflects the shared origins of the data.
// For instance, <0, [0,1,2,3]> indicates that the 0th input column (original 'a') is referenced
// by all four output columns in the final output.
//
// mergeInputIdxToOutputIdxes merges inputIdxToOutputIdxes based on detected column references.
// This ensures that columns with the same reference are correctly handled in the output chunk.
func (e *columnEvaluator) mergeInputIdxToOutputIdxes(input *chunk.Chunk, inputIdxToOutputIdxes map[int][]int) {
originalDJSet := disjointset.NewSet[int](4)
flag := make([]bool, input.NumCols())
// Detect self column-references inside the input chunk by comparing column addresses
for i := 0; i < input.NumCols(); i++ {
if flag[i] {
continue
}
for j := i + 1; j < input.NumCols(); j++ {
if input.Column(i) == input.Column(j) {
flag[j] = true
originalDJSet.Union(i, j)
}
}
}
// Merge inputIdxToOutputIdxes based on the detected column references.
newInputIdxToOutputIdxes := make(map[int][]int, len(inputIdxToOutputIdxes))
for inputIdx := range inputIdxToOutputIdxes {
// Root idx is internal offset, not the right column index.
originalRootIdx := originalDJSet.FindRoot(inputIdx)
originalVal, ok := originalDJSet.FindVal(originalRootIdx)
intest.Assert(ok)
mergedOutputIdxes := newInputIdxToOutputIdxes[originalVal]
mergedOutputIdxes = append(mergedOutputIdxes, inputIdxToOutputIdxes[inputIdx]...)
newInputIdxToOutputIdxes[originalVal] = mergedOutputIdxes
}
// Update the merged inputIdxToOutputIdxes automatically.
// Once failed, it means other worker has done this job at meantime.
e.mergedInputIdxToOutputIdxes.CompareAndSwap(nil, &newInputIdxToOutputIdxes)
}

type defaultEvaluator struct {
outputIdxes []int
exprs []Expression
Expand Down Expand Up @@ -201,8 +84,8 @@ func GetOptionalEvalPropsForExpr(expr Expression) exprctx.OptionalEvalPropKeySet
// It separates them to "column" and "other" expressions and evaluates "other"
// expressions before "column" expressions.
type EvaluatorSuite struct {
*columnEvaluator // Evaluator for column expressions.
*defaultEvaluator // Evaluator for other expressions.
ColumnSwapHelper *chunk.ColumnSwapHelper // Evaluator for column expressions.
*defaultEvaluator // Evaluator for other expressions.
}

// NewEvaluatorSuite creates an EvaluatorSuite to evaluate all the exprs.
Expand All @@ -212,11 +95,11 @@ func NewEvaluatorSuite(exprs []Expression, avoidColumnEvaluator bool) *Evaluator

for i := 0; i < len(exprs); i++ {
if col, isCol := exprs[i].(*Column); isCol && !avoidColumnEvaluator {
if e.columnEvaluator == nil {
e.columnEvaluator = &columnEvaluator{inputIdxToOutputIdxes: make(map[int][]int)}
if e.ColumnSwapHelper == nil {
e.ColumnSwapHelper = &chunk.ColumnSwapHelper{InputIdxToOutputIdxes: make(map[int][]int)}
}
inputIdx, outputIdx := col.Index, i
e.columnEvaluator.inputIdxToOutputIdxes[inputIdx] = append(e.columnEvaluator.inputIdxToOutputIdxes[inputIdx], outputIdx)
e.ColumnSwapHelper.InputIdxToOutputIdxes[inputIdx] = append(e.ColumnSwapHelper.InputIdxToOutputIdxes[inputIdx], outputIdx)
continue
}
if e.defaultEvaluator == nil {
Expand Down Expand Up @@ -250,8 +133,10 @@ func (e *EvaluatorSuite) Run(ctx EvalContext, vecEnabled bool, input, output *ch
}
}

if e.columnEvaluator != nil {
return e.columnEvaluator.run(ctx, input, output)
// NOTE: It should be called after all the other expressions are evaluated
// since it will change the content of the input Chunk.
if e.ColumnSwapHelper != nil {
return e.ColumnSwapHelper.SwapColumns(input, output)
}
return nil
}
Expand Down
40 changes: 0 additions & 40 deletions pkg/expression/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package expression

import (
"slices"
"testing"
"time"

Expand Down Expand Up @@ -659,42 +658,3 @@ func TestOptionalProp(t *testing.T) {
require.Equal(t, exprctx.OptPropCurrentUser.AsPropKeySet()|exprctx.OptPropDDLOwnerInfo.AsPropKeySet()|
exprctx.OptPropAdvisoryLock.AsPropKeySet(), evalSuit.RequiredOptionalEvalProps())
}

func TestMergeInputIdxToOutputIdxes(t *testing.T) {
ctx := createContext(t)
inputIdxToOutputIdxes := make(map[int][]int)
// input 0th should be column referred as 0th and 1st in output columns.
inputIdxToOutputIdxes[0] = []int{0, 1}
// input 1th should be column referred as 2nd and 3rd in output columns.
inputIdxToOutputIdxes[1] = []int{2, 3}
columnEval := columnEvaluator{inputIdxToOutputIdxes: inputIdxToOutputIdxes}

input := chunk.NewEmptyChunk([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)})
input.AppendInt64(0, 99)
// input chunk's 0th and 1st are column referred itself.
input.MakeRef(0, 1)

// chunk: col1 <---(ref) col2
// ____________/ \___________/ \___
// proj: col1 col2 col3 col4
//
// original case after inputIdxToOutputIdxes[0], the original col2 will be nil pointer
// cause consecutive col3,col4 ref projection are invalid.
//
// after fix, the new inputIdxToOutputIdxes should be: inputIdxToOutputIdxes[0]: {0, 1, 2, 3}

output := chunk.NewEmptyChunk([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong),
types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)})

err := columnEval.run(ctx, input, output)
require.NoError(t, err)
// all four columns are column-referred, pointing to the first one.
require.Equal(t, output.Column(0), output.Column(1))
require.Equal(t, output.Column(1), output.Column(2))
require.Equal(t, output.Column(2), output.Column(3))
require.Equal(t, output.GetRow(0).GetInt64(0), int64(99))

require.Equal(t, len(*columnEval.mergedInputIdxToOutputIdxes.Load()), 1)
slices.Sort((*columnEval.mergedInputIdxToOutputIdxes.Load())[0])
require.Equal(t, (*columnEval.mergedInputIdxToOutputIdxes.Load())[0], []int{0, 1, 2, 3})
}
2 changes: 1 addition & 1 deletion pkg/expression/integration_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 46,
shard_count = 47,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
49 changes: 49 additions & 0 deletions pkg/expression/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3903,3 +3903,52 @@ func TestIssue44706(t *testing.T) {
tk.MustQuery("SELECT t0.c2 FROM t0 WHERE ((-1)<=(~ ('n') = ANY (SELECT (NULL))))").Check(testkit.Rows())
tk.MustQuery("SELECT t0.c2 FROM t0 WHERE ((-1)<=(~ ('n') = ANY (SELECT MIN(t0.c2) FROM t0 WHERE false)))").Check(testkit.Rows())
}

func TestIssue55885(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t_jg8o (c_s int not null unique ,c__qy double ,c_z int not null ,c_a90ol text not null);")
tk.MustExec("insert into t_jg8o (c_s, c__qy, c_z, c_a90ol) values" +
"(-975033779, 85.65, -355481284, 'gnip' ),(-2018599732, 85.86, 1617093413, 'm' )," +
"(-960107027, 4.6, -2042358076, 'y1q')," +
"(-3, 38.1, -1528586343, 'ex_2')," +
"(69386953, 32768.0, -62220810, 'tfkxjj5c')," +
"(587181689, -9223372036854775806.3, -1666156943, 'queemvgj')," +
"(-218561796, 85.2, -670390288, 'nf990nol')," +
"(858419954, 2147483646.0, -1649362344, 'won_9')," +
"(-1120115215, 22.100, 1509989939, 'w')," +
"(-1388119356, 94.32, -1694148464, 'gu4i4knyhm')," +
"(-1016230734, -4294967295.8, 1430313391, 's')," +
"(-1861825796, 36.52, -1457928755, 'j')," +
"(1963621165, 88.87, 18928603, 'gxbsloff' )," +
"(1492879828, cast(null as double), 759883041, 'zwue')," +
"(-1607192175, 12.36, 1669523024, 'qt5zch71a')," +
"(1534068569, 46.79, -392085130, 'bc')," +
"(155707446, 9223372036854775809.4, 1727199557, 'qyghenu9t6')," +
"(-1524976778, 75.99, 335492222, 'sdgde0z')," +
"(175403335, cast(null as double), -69711503, 'ja')," +
"(-272715456, 48.62, 753928713, 'ur')," +
"(-2035825967, 257.3, -1598426762, 'lmqmn')," +
"(-1178957955, 2147483648.100000, 1432554380, 'dqpb210')," +
"(-2056628646, 254.5, -1476177588, 'k41ajpt7x')," +
"(-914210874, 126.7, -421919910, 'x57ud7oy1')," +
"(-88586773, 1.2, 1568247510, 'drmxi8')," +
"(-834563269, -4294967296.7, 1163133933, 'wp')," +
"(-84490060, 54.13, -630289437, '_3_twecg5h')," +
" (267700893, 54.75, 370343042, 'n72')," +
"(552106333, 32766.2, 2365745, 's7tt')," +
"(643440707, 65536.8, -850412592, 'wmluxa9a')," +
"(1709853766, -4294967296.5, -21041749, 'obqj0uu5v')," +
"(-7, 80.88, 528792379, 'n5qr9m26i')," +
"(-456431629, 28.43, 1958788149, 'b')," +
"(-28841240, 11.86, -1089765168, 'pqg')," +
"(-807839288, 25.89, 504535500, 'cs3tkhs')," +
"(-52910064, 85.16, 354032882, '_ffjo67yxe')," +
"(1919869830, 81.81, -272247558, 'aj')," +
"(165434725, -2147483648.0, 11, 'xxnsf5')," +
"(3, -2147483648.7, 1616632952, 'g7t8tqyi')," +
"(1851859144, 70.73, -1105664209, 'qjfhjr');")

tk.MustQuery("SELECT subq_0.c3 as c1 FROM (select c_a90ol as c3, c_a90ol as c4, var_pop(cast(c__qy as double)) over (partition by c_a90ol, c_s order by c_z) as c5 from t_jg8o limit 65) as subq_0 LIMIT 37")
}
2 changes: 2 additions & 0 deletions pkg/util/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ go_library(
"//pkg/parser/terror",
"//pkg/types",
"//pkg/util/checksum",
"//pkg/util/disjointset",
"//pkg/util/disk",
"//pkg/util/encrypt",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/syncutil",
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,12 @@ func (c *Chunk) MakeRefTo(dstColIdx int, src *Chunk, srcColIdx int) error {
return nil
}

// SwapColumn swaps Column "c.columns[colIdx]" with Column
// swapColumn swaps Column "c.columns[colIdx]" with Column
// "other.columns[otherIdx]". If there exists columns refer to the Column to be
// swapped, we need to re-build the reference.
func (c *Chunk) SwapColumn(colIdx int, other *Chunk, otherIdx int) error {
// this function should not be used directly, if you wants to swap columns between two chunks,
// use ColumnSwapHelper.SwapColumns instead.
func (c *Chunk) swapColumn(colIdx int, other *Chunk, otherIdx int) error {
if c.sel != nil || other.sel != nil {
return errors.New(msgErrSelNotNil)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,25 +640,25 @@ func TestSwapColumn(t *testing.T) {
checkRef()

// swap two chunk's columns
require.NoError(t, chk1.SwapColumn(0, chk2, 0))
require.NoError(t, chk1.swapColumn(0, chk2, 0))
checkRef()

require.NoError(t, chk1.SwapColumn(0, chk2, 0))
require.NoError(t, chk1.swapColumn(0, chk2, 0))
checkRef()

// swap reference and referenced columns
require.NoError(t, chk2.SwapColumn(1, chk2, 0))
require.NoError(t, chk2.swapColumn(1, chk2, 0))
checkRef()

// swap the same column in the same chunk
require.NoError(t, chk2.SwapColumn(1, chk2, 1))
require.NoError(t, chk2.swapColumn(1, chk2, 1))
checkRef()

// swap reference and another column
require.NoError(t, chk2.SwapColumn(1, chk2, 2))
require.NoError(t, chk2.swapColumn(1, chk2, 2))
checkRef()

require.NoError(t, chk2.SwapColumn(2, chk2, 0))
require.NoError(t, chk2.swapColumn(2, chk2, 0))
checkRef()
}

Expand Down
Loading

0 comments on commit 65281ad

Please sign in to comment.