From 84a61412c29b67868be62ca664b0b8f35d09a789 Mon Sep 17 00:00:00 2001 From: rebelice Date: Thu, 6 Jan 2022 18:21:23 +0800 Subject: [PATCH 1/3] suport aws query with TableScan and Selection for Where --- executor/builder.go | 47 +++++++++++++++++++++++++++++++++- executor/table_reader.go | 29 ++++++++++++++++++--- expression/explain.go | 55 ++++++++++++++++++++++++++++++++++++++++ expression/expression.go | 2 ++ parser/model/model.go | 10 ++++++++ util/ranger/types.go | 34 +++++++++++++++++++++++++ 6 files changed, 173 insertions(+), 4 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 4a4cc6fe54117..6995bd23b4763 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -17,6 +17,7 @@ package executor import ( "bytes" "context" + "github.com/pingcap/tipb/go-tipb" "math" "sort" "strconv" @@ -37,6 +38,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" + intervalutil "github.com/pingcap/tidb/interval/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/ast" @@ -64,7 +66,6 @@ import ( "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" - "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -3106,6 +3107,35 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe return gather } +func buildAWSQueryInfo(v *plannercore.PhysicalTableReader, id int64) *RestoreData { + info := RestoreData{Where: make([]string, 0)} + ts := v.GetTableScan() + for _, p := range v.TablePlans { + switch x := p.(type) { + case *plannercore.PhysicalTableScan: + info.Table = intervalutil.GetTablePartitionName(x.TableAsName.L, id) + info.DB = "test" + var unsignedIntHandle bool + if ts.Table.PKIsHandle { + if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil { + unsignedIntHandle = mysql.HasUnsignedFlag(pkColInfo.Flag) + } + } + for _, r := range x.Ranges { + if r.IsFullRange(unsignedIntHandle) { + continue + } + info.Where = append(info.Where, r.RestoreString(x.Table.GetPkColName())...) + } + case *plannercore.PhysicalSelection: + for _, c := range x.Conditions { + info.Where = append(info.Where, c.Restore()) + } + } + } + return &info +} + // buildTableReader builds a table reader executor. It first build a no range table reader, // and then update it ranges from table scan plan. func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor { @@ -3142,6 +3172,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) + if ok, id := readFromS3(ts); ok { + ret.AWSQueryInfo = buildAWSQueryInfo(v, id) + } + if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() { return ret } @@ -4560,6 +4594,17 @@ func getPhysicalTableID(t table.Table) int64 { return t.Meta().ID } +func readFromS3(t *plannercore.PhysicalTableScan) (bool, int64) { + if ok, id := t.IsPartition(); ok { + for _, p := range t.Table.Partition.Definitions { + if p.ID == id { + return p.Engine == kv.AWSS3Engine, p.ID + } + } + } + return false, 0 +} + func getPhysicalTableEngine(t table.Table) (int64, string) { if p, ok := t.(table.PhysicalTable); ok { pid := p.GetPhysicalID() diff --git a/executor/table_reader.go b/executor/table_reader.go index d02b5ebb744be..147ec857ceb38 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -15,6 +15,7 @@ package executor import ( + "bytes" "context" "fmt" "sort" @@ -24,7 +25,6 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/interval/athena" - "github.com/pingcap/tidb/interval/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -116,9 +116,32 @@ type TableReaderExecutor struct { // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. extraPIDColumnIndex offsetOptional + AWSQueryInfo *RestoreData awsQueryResult *awsathena.ResultSet } +type RestoreData struct { + DB string + Table string + Where []string +} + +func (d *RestoreData) String() string { + var buffer bytes.Buffer + fmt.Fprint(&buffer, "select * from ") + fmt.Fprint(&buffer, d.DB) + fmt.Fprint(&buffer, ".") + fmt.Fprint(&buffer, d.Table) + fmt.Fprint(&buffer, " where ") + for i, c := range d.Where { + if i != 0 { + fmt.Fprint(&buffer, " and ") + } + fmt.Fprint(&buffer, c) + } + return buffer.String() +} + // offsetOptional may be a positive integer, or invalid. type offsetOptional int @@ -387,8 +410,8 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ } func (e *TableReaderExecutor) fetchResultFromAws(pid int64) error { - tableName := util.GetTablePartitionName(e.table.Meta().Name.L, pid) - query := fmt.Sprintf("SELECT * FROM \"%v\".\"%v\" ", "test", tableName) + query := e.AWSQueryInfo.String() + logutil.BgLogger().Info(fmt.Sprintf("[aws query] %v", query)) cli, err := athena.CreateCli("us-west-2") if err != nil { return nil diff --git a/expression/explain.go b/expression/explain.go index daffbde3879c0..e6280aad30b16 100644 --- a/expression/explain.go +++ b/expression/explain.go @@ -60,6 +60,61 @@ func (expr *ScalarFunction) explainInfo(normalized bool) string { return buffer.String() } +func (expr *ScalarFunction) Restore() string { + return expr.restore() +} + +func (expr *ScalarFunction) restore() string { + var buffer bytes.Buffer + switch expr.FuncName.L { + case ast.LT: + fmt.Fprint(&buffer, "(") + buffer.WriteString(expr.GetArgs()[0].Restore()) + fmt.Fprint(&buffer, " < ") + buffer.WriteString(expr.GetArgs()[1].Restore()) + fmt.Fprint(&buffer, ")") + case ast.GT: + fmt.Fprint(&buffer, "(") + buffer.WriteString(expr.GetArgs()[0].Restore()) + fmt.Fprint(&buffer, " > ") + buffer.WriteString(expr.GetArgs()[1].Restore()) + fmt.Fprint(&buffer, ")") + case ast.LE: + fmt.Fprint(&buffer, "(") + buffer.WriteString(expr.GetArgs()[0].Restore()) + fmt.Fprint(&buffer, " <= ") + buffer.WriteString(expr.GetArgs()[1].Restore()) + fmt.Fprint(&buffer, ")") + case ast.GE: + fmt.Fprint(&buffer, "(") + buffer.WriteString(expr.GetArgs()[0].Restore()) + fmt.Fprint(&buffer, " >= ") + buffer.WriteString(expr.GetArgs()[1].Restore()) + fmt.Fprint(&buffer, ")") + case ast.EQ: + fmt.Fprint(&buffer, "(") + buffer.WriteString(expr.GetArgs()[0].Restore()) + fmt.Fprint(&buffer, " = ") + buffer.WriteString(expr.GetArgs()[1].Restore()) + fmt.Fprint(&buffer, ")") + case ast.NE: + fmt.Fprint(&buffer, "(") + buffer.WriteString(expr.GetArgs()[0].Restore()) + fmt.Fprint(&buffer, " != ") + buffer.WriteString(expr.GetArgs()[1].Restore()) + fmt.Fprint(&buffer, ")") + } + return buffer.String() +} + +func (col *Column) Restore() string { + return col.ExplainInfo() +} + +func (expr *Constant) Restore() string { + return expr.ExplainInfo() +} + // ExplainNormalizedInfo implements the Expression interface. func (expr *ScalarFunction) ExplainNormalizedInfo() string { return expr.explainInfo(true) diff --git a/expression/expression.go b/expression/expression.go index 0fc49b1adbc49..ccb06298d4d9b 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -176,6 +176,8 @@ type Expression interface { // Column: ColumnFlag+encoded value // ScalarFunction: SFFlag+encoded function name + encoded arg_1 + encoded arg_2 + ... HashCode(sc *stmtctx.StatementContext) []byte + + Restore() string } // CNFExprs stands for a CNF expression. diff --git a/parser/model/model.go b/parser/model/model.go index 6e3d886dfff9c..98c46a52bced2 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -568,6 +568,16 @@ func (t *TableInfo) GetPkColInfo() *ColumnInfo { return nil } +func (t *TableInfo) GetPkColName() []string { + ret := make([]string, 0, 1) + for _, colInfo := range t.Columns { + if mysql.HasPriKeyFlag(colInfo.Flag) { + return append(ret, colInfo.Name.L) + } + } + return ret +} + func (t *TableInfo) GetAutoIncrementColInfo() *ColumnInfo { for _, colInfo := range t.Columns { if mysql.HasAutoIncrementFlag(colInfo.Flag) { diff --git a/util/ranger/types.go b/util/ranger/types.go index b932650588f86..a1f6ca1efaebc 100644 --- a/util/ranger/types.go +++ b/util/ranger/types.go @@ -181,6 +181,40 @@ func (ran *Range) String() string { return l + strings.Join(lowStrs, " ") + "," + strings.Join(highStrs, " ") + r } +func (ran *Range) RestoreString(col []string) []string { + if len(col) != len(ran.LowVal) { + return nil + } + lowStrs := make([]string, 0, len(ran.LowVal)) + for _, d := range ran.LowVal { + lowStrs = append(lowStrs, formatDatum(d, true)) + } + highStrs := make([]string, 0, len(ran.LowVal)) + for _, d := range ran.HighVal { + highStrs = append(highStrs, formatDatum(d, false)) + } + + ret := make([]string, 0, len(col)*2) + + for i := range col { + if strings.Compare(lowStrs[i], "-inf") != 0 { + if ran.LowExclude { + ret = append(ret, fmt.Sprintf("%v > %v", col[i], lowStrs[i])) + } else { + ret = append(ret, fmt.Sprintf("%v >= %v", col[i], lowStrs[i])) + } + } + if strings.Compare(highStrs[i], "+inf") != 0 { + if ran.HighExclude { + ret = append(ret, fmt.Sprintf("%v < %v", col[i], highStrs[i])) + } else { + ret = append(ret, fmt.Sprintf("%v <= %v", col[i], highStrs[i])) + } + } + } + return ret +} + // Encode encodes the range to its encoded value. func (ran *Range) Encode(sc *stmtctx.StatementContext, lowBuffer, highBuffer []byte) ([]byte, []byte, error) { var err error From 1c01a0c3e1825b5a7ecb93216a62c44bc280ceaf Mon Sep 17 00:00:00 2001 From: rebelice Date: Thu, 6 Jan 2022 18:48:55 +0800 Subject: [PATCH 2/3] fix table name --- executor/builder.go | 2 +- expression/explain.go | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 6995bd23b4763..d2152cd6aa298 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3113,7 +3113,7 @@ func buildAWSQueryInfo(v *plannercore.PhysicalTableReader, id int64) *RestoreDat for _, p := range v.TablePlans { switch x := p.(type) { case *plannercore.PhysicalTableScan: - info.Table = intervalutil.GetTablePartitionName(x.TableAsName.L, id) + info.Table = intervalutil.GetTablePartitionName(x.Table.Name.L, id) info.DB = "test" var unsignedIntHandle bool if ts.Table.PKIsHandle { diff --git a/expression/explain.go b/expression/explain.go index e6280aad30b16..755f3148cfef8 100644 --- a/expression/explain.go +++ b/expression/explain.go @@ -17,12 +17,11 @@ package expression import ( "bytes" "fmt" - "sort" - "strings" - "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "sort" + "strings" ) // ExplainInfo implements the Expression interface. @@ -120,9 +119,17 @@ func (expr *ScalarFunction) ExplainNormalizedInfo() string { return expr.explainInfo(true) } +func onlyColumnName(s string) string { + v := strings.Split(s, ".") + if len(v) == 0 { + return "" + } + return v[len(v)-1] +} + // ExplainInfo implements the Expression interface. func (col *Column) ExplainInfo() string { - return col.String() + return onlyColumnName(col.String()) } // ExplainNormalizedInfo implements the Expression interface. From 5812cfea2da38e31124c69a54563dd72a2c5a33c Mon Sep 17 00:00:00 2001 From: rebelice Date: Thu, 6 Jan 2022 19:25:34 +0800 Subject: [PATCH 3/3] fix table name --- executor/builder.go | 4 +++- expression/explain.go | 52 +++++++++++++++++++--------------------- expression/expression.go | 2 +- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d2152cd6aa298..c3236f5dfa33f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3110,11 +3110,13 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe func buildAWSQueryInfo(v *plannercore.PhysicalTableReader, id int64) *RestoreData { info := RestoreData{Where: make([]string, 0)} ts := v.GetTableScan() + var tableInfo *model.TableInfo for _, p := range v.TablePlans { switch x := p.(type) { case *plannercore.PhysicalTableScan: info.Table = intervalutil.GetTablePartitionName(x.Table.Name.L, id) info.DB = "test" + tableInfo = x.Table var unsignedIntHandle bool if ts.Table.PKIsHandle { if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil { @@ -3129,7 +3131,7 @@ func buildAWSQueryInfo(v *plannercore.PhysicalTableReader, id int64) *RestoreDat } case *plannercore.PhysicalSelection: for _, c := range x.Conditions { - info.Where = append(info.Where, c.Restore()) + info.Where = append(info.Where, c.Restore(tableInfo)) } } } diff --git a/expression/explain.go b/expression/explain.go index 755f3148cfef8..132005b2a0cdc 100644 --- a/expression/explain.go +++ b/expression/explain.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "sort" @@ -59,58 +60,63 @@ func (expr *ScalarFunction) explainInfo(normalized bool) string { return buffer.String() } -func (expr *ScalarFunction) Restore() string { - return expr.restore() +func (expr *ScalarFunction) Restore(t *model.TableInfo) string { + return expr.restore(t) } -func (expr *ScalarFunction) restore() string { +func (expr *ScalarFunction) restore(t *model.TableInfo) string { var buffer bytes.Buffer switch expr.FuncName.L { case ast.LT: fmt.Fprint(&buffer, "(") - buffer.WriteString(expr.GetArgs()[0].Restore()) + buffer.WriteString(expr.GetArgs()[0].Restore(t)) fmt.Fprint(&buffer, " < ") - buffer.WriteString(expr.GetArgs()[1].Restore()) + buffer.WriteString(expr.GetArgs()[1].Restore(t)) fmt.Fprint(&buffer, ")") case ast.GT: fmt.Fprint(&buffer, "(") - buffer.WriteString(expr.GetArgs()[0].Restore()) + buffer.WriteString(expr.GetArgs()[0].Restore(t)) fmt.Fprint(&buffer, " > ") - buffer.WriteString(expr.GetArgs()[1].Restore()) + buffer.WriteString(expr.GetArgs()[1].Restore(t)) fmt.Fprint(&buffer, ")") case ast.LE: fmt.Fprint(&buffer, "(") - buffer.WriteString(expr.GetArgs()[0].Restore()) + buffer.WriteString(expr.GetArgs()[0].Restore(t)) fmt.Fprint(&buffer, " <= ") - buffer.WriteString(expr.GetArgs()[1].Restore()) + buffer.WriteString(expr.GetArgs()[1].Restore(t)) fmt.Fprint(&buffer, ")") case ast.GE: fmt.Fprint(&buffer, "(") - buffer.WriteString(expr.GetArgs()[0].Restore()) + buffer.WriteString(expr.GetArgs()[0].Restore(t)) fmt.Fprint(&buffer, " >= ") - buffer.WriteString(expr.GetArgs()[1].Restore()) + buffer.WriteString(expr.GetArgs()[1].Restore(t)) fmt.Fprint(&buffer, ")") case ast.EQ: fmt.Fprint(&buffer, "(") - buffer.WriteString(expr.GetArgs()[0].Restore()) + buffer.WriteString(expr.GetArgs()[0].Restore(t)) fmt.Fprint(&buffer, " = ") - buffer.WriteString(expr.GetArgs()[1].Restore()) + buffer.WriteString(expr.GetArgs()[1].Restore(t)) fmt.Fprint(&buffer, ")") case ast.NE: fmt.Fprint(&buffer, "(") - buffer.WriteString(expr.GetArgs()[0].Restore()) + buffer.WriteString(expr.GetArgs()[0].Restore(t)) fmt.Fprint(&buffer, " != ") - buffer.WriteString(expr.GetArgs()[1].Restore()) + buffer.WriteString(expr.GetArgs()[1].Restore(t)) fmt.Fprint(&buffer, ")") } return buffer.String() } -func (col *Column) Restore() string { - return col.ExplainInfo() +func (col *Column) Restore(t *model.TableInfo) string { + for _, c := range t.Columns { + if c.ID == col.ID { + return c.Name.L + } + } + return "" } -func (expr *Constant) Restore() string { +func (expr *Constant) Restore(t *model.TableInfo) string { return expr.ExplainInfo() } @@ -119,17 +125,9 @@ func (expr *ScalarFunction) ExplainNormalizedInfo() string { return expr.explainInfo(true) } -func onlyColumnName(s string) string { - v := strings.Split(s, ".") - if len(v) == 0 { - return "" - } - return v[len(v)-1] -} - // ExplainInfo implements the Expression interface. func (col *Column) ExplainInfo() string { - return onlyColumnName(col.String()) + return col.String() } // ExplainNormalizedInfo implements the Expression interface. diff --git a/expression/expression.go b/expression/expression.go index ccb06298d4d9b..4a83c3ac79aa3 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -177,7 +177,7 @@ type Expression interface { // ScalarFunction: SFFlag+encoded function name + encoded arg_1 + encoded arg_2 + ... HashCode(sc *stmtctx.StatementContext) []byte - Restore() string + Restore(info *model.TableInfo) string } // CNFExprs stands for a CNF expression.