Skip to content

Commit

Permalink
Merge pull request #4 from rebelice/dev_where
Browse files Browse the repository at this point in the history
create `aws` SQL, support WHERE for TableScan and Selection
  • Loading branch information
crazycs520 authored Jan 6, 2022
2 parents ed3dee3 + 5812cfe commit bf88cdc
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 7 deletions.
49 changes: 48 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"bytes"
"context"
"github.com/pingcap/tipb/go-tipb"
"math"
"sort"
"strconv"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
intervalutil "github.com/pingcap/tidb/interval/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -64,7 +66,6 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -3106,6 +3107,37 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
return gather
}

func buildAWSQueryInfo(v *plannercore.PhysicalTableReader, id int64) *RestoreData {
info := RestoreData{Where: make([]string, 0)}
ts := v.GetTableScan()
var tableInfo *model.TableInfo
for _, p := range v.TablePlans {
switch x := p.(type) {
case *plannercore.PhysicalTableScan:
info.Table = intervalutil.GetTablePartitionName(x.Table.Name.L, id)
info.DB = "test"
tableInfo = x.Table
var unsignedIntHandle bool
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
unsignedIntHandle = mysql.HasUnsignedFlag(pkColInfo.Flag)
}
}
for _, r := range x.Ranges {
if r.IsFullRange(unsignedIntHandle) {
continue
}
info.Where = append(info.Where, r.RestoreString(x.Table.GetPkColName())...)
}
case *plannercore.PhysicalSelection:
for _, c := range x.Conditions {
info.Where = append(info.Where, c.Restore(tableInfo))
}
}
}
return &info
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor {
Expand Down Expand Up @@ -3142,6 +3174,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)

if ok, id := readFromS3(ts); ok {
ret.AWSQueryInfo = buildAWSQueryInfo(v, id)
}

if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
Expand Down Expand Up @@ -4560,6 +4596,17 @@ func getPhysicalTableID(t table.Table) int64 {
return t.Meta().ID
}

func readFromS3(t *plannercore.PhysicalTableScan) (bool, int64) {
if ok, id := t.IsPartition(); ok {
for _, p := range t.Table.Partition.Definitions {
if p.ID == id {
return p.Engine == kv.AWSS3Engine, p.ID
}
}
}
return false, 0
}

func getPhysicalTableEngine(t table.Table) (int64, string) {
if p, ok := t.(table.PhysicalTable); ok {
pid := p.GetPhysicalID()
Expand Down
29 changes: 26 additions & 3 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor

import (
"bytes"
"context"
"fmt"
"sort"
Expand All @@ -24,7 +25,6 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/interval/athena"
"github.com/pingcap/tidb/interval/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -116,9 +116,32 @@ type TableReaderExecutor struct {
// extraPIDColumnIndex is used for partition reader to add an extra partition ID column.
extraPIDColumnIndex offsetOptional

AWSQueryInfo *RestoreData
awsQueryResult *awsathena.ResultSet
}

type RestoreData struct {
DB string
Table string
Where []string
}

func (d *RestoreData) String() string {
var buffer bytes.Buffer
fmt.Fprint(&buffer, "select * from ")
fmt.Fprint(&buffer, d.DB)
fmt.Fprint(&buffer, ".")
fmt.Fprint(&buffer, d.Table)
fmt.Fprint(&buffer, " where ")
for i, c := range d.Where {
if i != 0 {
fmt.Fprint(&buffer, " and ")
}
fmt.Fprint(&buffer, c)
}
return buffer.String()
}

// offsetOptional may be a positive integer, or invalid.
type offsetOptional int

Expand Down Expand Up @@ -387,8 +410,8 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
}

func (e *TableReaderExecutor) fetchResultFromAws(pid int64) error {
tableName := util.GetTablePartitionName(e.table.Meta().Name.L, pid)
query := fmt.Sprintf("SELECT * FROM \"%v\".\"%v\" ", "test", tableName)
query := e.AWSQueryInfo.String()
logutil.BgLogger().Info(fmt.Sprintf("[aws query] %v", query))
cli, err := athena.CreateCli("us-west-2")
if err != nil {
return nil
Expand Down
66 changes: 63 additions & 3 deletions expression/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package expression
import (
"bytes"
"fmt"
"sort"
"strings"

"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"sort"
"strings"
)

// ExplainInfo implements the Expression interface.
Expand Down Expand Up @@ -60,6 +60,66 @@ func (expr *ScalarFunction) explainInfo(normalized bool) string {
return buffer.String()
}

func (expr *ScalarFunction) Restore(t *model.TableInfo) string {
return expr.restore(t)
}

func (expr *ScalarFunction) restore(t *model.TableInfo) string {
var buffer bytes.Buffer
switch expr.FuncName.L {
case ast.LT:
fmt.Fprint(&buffer, "(")
buffer.WriteString(expr.GetArgs()[0].Restore(t))
fmt.Fprint(&buffer, " < ")
buffer.WriteString(expr.GetArgs()[1].Restore(t))
fmt.Fprint(&buffer, ")")
case ast.GT:
fmt.Fprint(&buffer, "(")
buffer.WriteString(expr.GetArgs()[0].Restore(t))
fmt.Fprint(&buffer, " > ")
buffer.WriteString(expr.GetArgs()[1].Restore(t))
fmt.Fprint(&buffer, ")")
case ast.LE:
fmt.Fprint(&buffer, "(")
buffer.WriteString(expr.GetArgs()[0].Restore(t))
fmt.Fprint(&buffer, " <= ")
buffer.WriteString(expr.GetArgs()[1].Restore(t))
fmt.Fprint(&buffer, ")")
case ast.GE:
fmt.Fprint(&buffer, "(")
buffer.WriteString(expr.GetArgs()[0].Restore(t))
fmt.Fprint(&buffer, " >= ")
buffer.WriteString(expr.GetArgs()[1].Restore(t))
fmt.Fprint(&buffer, ")")
case ast.EQ:
fmt.Fprint(&buffer, "(")
buffer.WriteString(expr.GetArgs()[0].Restore(t))
fmt.Fprint(&buffer, " = ")
buffer.WriteString(expr.GetArgs()[1].Restore(t))
fmt.Fprint(&buffer, ")")
case ast.NE:
fmt.Fprint(&buffer, "(")
buffer.WriteString(expr.GetArgs()[0].Restore(t))
fmt.Fprint(&buffer, " != ")
buffer.WriteString(expr.GetArgs()[1].Restore(t))
fmt.Fprint(&buffer, ")")
}
return buffer.String()
}

func (col *Column) Restore(t *model.TableInfo) string {
for _, c := range t.Columns {
if c.ID == col.ID {
return c.Name.L
}
}
return ""
}

func (expr *Constant) Restore(t *model.TableInfo) string {
return expr.ExplainInfo()
}

// ExplainNormalizedInfo implements the Expression interface.
func (expr *ScalarFunction) ExplainNormalizedInfo() string {
return expr.explainInfo(true)
Expand Down
2 changes: 2 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ type Expression interface {
// Column: ColumnFlag+encoded value
// ScalarFunction: SFFlag+encoded function name + encoded arg_1 + encoded arg_2 + ...
HashCode(sc *stmtctx.StatementContext) []byte

Restore(info *model.TableInfo) string
}

// CNFExprs stands for a CNF expression.
Expand Down
10 changes: 10 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,16 @@ func (t *TableInfo) GetPkColInfo() *ColumnInfo {
return nil
}

func (t *TableInfo) GetPkColName() []string {
ret := make([]string, 0, 1)
for _, colInfo := range t.Columns {
if mysql.HasPriKeyFlag(colInfo.Flag) {
return append(ret, colInfo.Name.L)
}
}
return ret
}

func (t *TableInfo) GetAutoIncrementColInfo() *ColumnInfo {
for _, colInfo := range t.Columns {
if mysql.HasAutoIncrementFlag(colInfo.Flag) {
Expand Down
34 changes: 34 additions & 0 deletions util/ranger/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,40 @@ func (ran *Range) String() string {
return l + strings.Join(lowStrs, " ") + "," + strings.Join(highStrs, " ") + r
}

func (ran *Range) RestoreString(col []string) []string {
if len(col) != len(ran.LowVal) {
return nil
}
lowStrs := make([]string, 0, len(ran.LowVal))
for _, d := range ran.LowVal {
lowStrs = append(lowStrs, formatDatum(d, true))
}
highStrs := make([]string, 0, len(ran.LowVal))
for _, d := range ran.HighVal {
highStrs = append(highStrs, formatDatum(d, false))
}

ret := make([]string, 0, len(col)*2)

for i := range col {
if strings.Compare(lowStrs[i], "-inf") != 0 {
if ran.LowExclude {
ret = append(ret, fmt.Sprintf("%v > %v", col[i], lowStrs[i]))
} else {
ret = append(ret, fmt.Sprintf("%v >= %v", col[i], lowStrs[i]))
}
}
if strings.Compare(highStrs[i], "+inf") != 0 {
if ran.HighExclude {
ret = append(ret, fmt.Sprintf("%v < %v", col[i], highStrs[i]))
} else {
ret = append(ret, fmt.Sprintf("%v <= %v", col[i], highStrs[i]))
}
}
}
return ret
}

// Encode encodes the range to its encoded value.
func (ran *Range) Encode(sc *stmtctx.StatementContext, lowBuffer, highBuffer []byte) ([]byte, []byte, error) {
var err error
Expand Down

0 comments on commit bf88cdc

Please sign in to comment.