Skip to content

Commit

Permalink
feat: support com_qeury update sql request (#115)
Browse files Browse the repository at this point in the history
* feat: support com_qeury update sql request
  • Loading branch information
dk-lockdown committed May 28, 2022
1 parent 04fb507 commit 9eff19f
Show file tree
Hide file tree
Showing 21 changed files with 861 additions and 379 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang-module/carbon v1.6.6
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.7
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand Down Expand Up @@ -54,4 +53,5 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
golang.org/x/sys v0.0.0-20220519141025-dcacdad47464 // indirect
golang.org/x/tools v0.1.10 // indirect
google.golang.org/protobuf v1.27.1
)
79 changes: 59 additions & 20 deletions pkg/dt/mysql_undo_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (

type BuildUndoSql func(undoLog undolog.SqlUndoLog) string

func DeleteBuildUndoSql(undoLog *undolog.SqlUndoLog) string {
func BuildDeleteUndoSql(undoLog *undolog.SqlUndoLog) string {
beforeImage := undoLog.BeforeImage
beforeImageRows := beforeImage.Rows

Expand All @@ -60,11 +60,11 @@ func DeleteBuildUndoSql(undoLog *undolog.SqlUndoLog) string {
var sbCols, sbVals strings.Builder
var size = len(fields)
for i, field := range fields {
fmt.Fprintf(&sbCols, "`%s`", field.Name)
fmt.Fprint(&sbVals, "?")
sbCols.WriteString(fmt.Sprintf("`%s`", field.Name))
sbVals.WriteByte('?')
if i < size-1 {
fmt.Fprint(&sbCols, ", ")
fmt.Fprint(&sbVals, ", ")
sbCols.WriteString(", ")
sbVals.WriteString(", ")
}
}
insertColumns := sbCols.String()
Expand All @@ -73,7 +73,7 @@ func DeleteBuildUndoSql(undoLog *undolog.SqlUndoLog) string {
return fmt.Sprintf(InsertSqlTemplate, undoLog.TableName, insertColumns, insertValues)
}

func InsertBuildUndoSql(undoLog *undolog.SqlUndoLog) string {
func BuildInsertUndoSql(undoLog *undolog.SqlUndoLog) string {
afterImage := undoLog.AfterImage
afterImageRows := afterImage.Rows
if len(afterImageRows) == 0 {
Expand All @@ -84,7 +84,7 @@ func InsertBuildUndoSql(undoLog *undolog.SqlUndoLog) string {
return fmt.Sprintf(DeleteSqlTemplate, undoLog.TableName, pkField.Name)
}

func UpdateBuildUndoSql(undoLog *undolog.SqlUndoLog) string {
func BuildUpdateUndoSql(undoLog *undolog.SqlUndoLog) string {
beforeImage := undoLog.BeforeImage
beforeImageRows := beforeImage.Rows

Expand Down Expand Up @@ -135,15 +135,15 @@ func (executor MysqlUndoExecutor) Execute(tx proto.Tx) error {
// DELETE FROM a WHERE pk = ?
switch executor.sqlUndoLog.SqlType {
case constant.SQLType_INSERT:
undoSql = InsertBuildUndoSql(executor.sqlUndoLog)
undoSql = BuildInsertUndoSql(executor.sqlUndoLog)
undoRows = *executor.sqlUndoLog.AfterImage

case constant.SQLType_DELETE:
undoSql = DeleteBuildUndoSql(executor.sqlUndoLog)
undoSql = BuildDeleteUndoSql(executor.sqlUndoLog)
undoRows = *executor.sqlUndoLog.BeforeImage

case constant.SQLType_UPDATE:
undoSql = UpdateBuildUndoSql(executor.sqlUndoLog)
undoSql = BuildUpdateUndoSql(executor.sqlUndoLog)
undoRows = *executor.sqlUndoLog.BeforeImage

default:
Expand Down Expand Up @@ -224,25 +224,64 @@ func (executor MysqlUndoExecutor) queryCurrentRecords(tx proto.Tx) (*schema.Tabl
pkValues = append(pkValues, field.Value)
}

if executor.sqlUndoLog.IsBinary {
selectSql := executor.buildCurrentRecordsForPrepareSql(tableMeta, pkName, pkValues)
dataTable, _, err := tx.ExecuteSql(context.Background(), selectSql, pkValues...)
if err != nil {
return nil, err
}
dt := dataTable.(*mysql.Result)
return schema.BuildBinaryRecords(tableMeta, dt), nil
} else {
selectSql := executor.buildCurrentRecordsForQuerySql(tableMeta, pkName, pkValues)
dataTable, _, err := tx.Query(context.Background(), selectSql)
if err != nil {
return nil, err
}
dt := dataTable.(*mysql.Result)
return schema.BuildTextRecords(tableMeta, dt), nil
}
}

func (executor MysqlUndoExecutor) buildCurrentRecordsForPrepareSql(tableMeta schema.TableMeta, pkColumn string, pkValues []interface{}) string {
var b strings.Builder
var i = 0
columnCount := len(tableMeta.Columns)
for _, columnName := range tableMeta.Columns {
fmt.Fprint(&b, misc.CheckAndReplace(columnName))
b.WriteString(misc.CheckAndReplace(columnName))
i = i + 1
if i < columnCount {
fmt.Fprint(&b, ",")
b.WriteByte(',')
} else {
fmt.Fprint(&b, " ")
b.WriteByte(' ')
}
}

inCondition := misc.MysqlAppendInParam(len(pkValues))
selectSql := fmt.Sprintf(SelectSqlTemplate, b.String(), tableMeta.TableName, pkName, inCondition)
dataTable, _, err := tx.ExecuteSql(context.Background(), selectSql, pkValues...)
if err != nil {
return nil, err
return fmt.Sprintf(SelectSqlTemplate, b.String(), tableMeta.TableName, pkColumn, inCondition)
}

func (executor MysqlUndoExecutor) buildCurrentRecordsForQuerySql(tableMeta schema.TableMeta, pkColumn string, pkValues []interface{}) string {
var columns strings.Builder
var inCondition strings.Builder
var i = 0
columnCount := len(tableMeta.Columns)
for _, columnName := range tableMeta.Columns {
columns.WriteString(misc.CheckAndReplace(columnName))
i = i + 1
if i < columnCount {
columns.WriteByte(',')
} else {
columns.WriteByte(' ')
}
}
inCondition.WriteByte('(')
for i, pk := range pkValues {
if i < len(pkValues)-1 {
inCondition.WriteString(fmt.Sprintf("'%s',", pk))
} else {
inCondition.WriteString(fmt.Sprintf("'%s'", pk))
}
}
dt := dataTable.(*mysql.Result)
return schema.BuildRecords(tableMeta, dt), nil
inCondition.WriteByte(')')
return fmt.Sprintf(SelectSqlTemplate, columns.String(), tableMeta.TableName, pkColumn, inCondition.String())
}
49 changes: 44 additions & 5 deletions pkg/dt/schema/table_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ func BuildLockKey(lockKeyRecords *TableRecords) string {
}

var sb strings.Builder
fmt.Fprintf(&sb, lockKeyRecords.TableName)
fmt.Fprint(&sb, ":")
sb.WriteString(lockKeyRecords.TableName)
sb.WriteByte(':')
fields := lockKeyRecords.PKFields()
length := len(fields)
for i, field := range fields {
fmt.Fprint(&sb, field.Value)
sb.WriteString(fmt.Sprintf("%s", field.Value))
if i < length-1 {
fmt.Fprint(&sb, ",")
sb.WriteByte(',')
}
}
return sb.String()
}

func BuildRecords(meta TableMeta, result *mysql.Result) *TableRecords {
func BuildBinaryRecords(meta TableMeta, result *mysql.Result) *TableRecords {
records := NewTableRecords(meta)
rs := make([]*Row, 0)

Expand Down Expand Up @@ -114,3 +114,42 @@ func BuildRecords(meta TableMeta, result *mysql.Result) *TableRecords {
records.Rows = rs
return records
}

func BuildTextRecords(meta TableMeta, result *mysql.Result) *TableRecords {
records := NewTableRecords(meta)
rs := make([]*Row, 0)

for {
row, err := result.Rows.Next()
if err != nil {
break
}

textRow := mysql.TextRow{Row: row}
values, err := textRow.Decode()
if err != nil {
break
}
fields := make([]*Field, 0, len(result.Fields))
for i, col := range result.Fields {
field := &Field{
Name: col.FiledName(),
Type: meta.AllColumns[col.FiledName()].DataType,
}
if values[i] != nil {
field.Value = values[i].Val
}
if strings.EqualFold(col.FiledName(), meta.GetPKName()) {
field.KeyType = PrimaryKey
}
fields = append(fields, field)
}
r := &Row{Fields: fields}
rs = append(rs, r)
}
if len(rs) == 0 {
return nil
}
records.Rows = rs
return records
}
4 changes: 3 additions & 1 deletion pkg/dt/undolog/protobuf_undo_log_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"reflect"
"time"

"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
"vimagination.zapto.org/byteio"

"github.com/cectc/dbpack/pkg/constant"
Expand Down Expand Up @@ -223,6 +223,7 @@ func convertPbTableRecords(pbRecords *PbTableRecords) *schema.TableRecords {

func convertSqlUndoLog(undoLog *SqlUndoLog) *PbSqlUndoLog {
pbSqlUndoLog := &PbSqlUndoLog{
IsBinary: undoLog.IsBinary,
SqlType: int32(undoLog.SqlType),
SchemaName: undoLog.SchemaName,
TableName: undoLog.TableName,
Expand All @@ -242,6 +243,7 @@ func convertSqlUndoLog(undoLog *SqlUndoLog) *PbSqlUndoLog {

func convertPbSqlUndoLog(pbSqlUndoLog *PbSqlUndoLog) *SqlUndoLog {
sqlUndoLog := &SqlUndoLog{
IsBinary: pbSqlUndoLog.IsBinary,
SqlType: constant.SQLType(pbSqlUndoLog.SqlType),
SchemaName: pbSqlUndoLog.SchemaName,
TableName: pbSqlUndoLog.TableName,
Expand Down
1 change: 1 addition & 0 deletions pkg/dt/undolog/protobuf_undo_log_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func getBranchUndoLog() *BranchUndoLog {
BranchID: 2000042936,
SqlUndoLogs: []*SqlUndoLog{
{
IsBinary: true,
SqlType: constant.SQLType_INSERT,
TableName: "user",
BeforeImage: nil,
Expand Down
Loading

0 comments on commit 9eff19f

Please sign in to comment.