Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc: Support Vector data type | tidb=master pd=master tikv=master #11538

Merged
merged 39 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ba1663c
vector support
wk989898 Aug 12, 2024
20d340b
update
wk989898 Aug 12, 2024
2283416
.
wk989898 Aug 15, 2024
e3ec642
add dep
wk989898 Aug 22, 2024
2ce0160
Merge remote-tracking branch 'origin' into vector
wk989898 Aug 22, 2024
aca6b17
update
wk989898 Aug 22, 2024
826a24d
update
wk989898 Aug 26, 2024
173ddcc
Merge branch 'master' of https://github.com/wk989898/tiflow into vector
wk989898 Aug 26, 2024
d018cab
update
wk989898 Aug 27, 2024
352c435
fmt
wk989898 Aug 29, 2024
7686261
fix
wk989898 Aug 30, 2024
d6d5df6
add test
wk989898 Aug 30, 2024
68a0417
update test
wk989898 Sep 3, 2024
473ea11
update
wk989898 Sep 4, 2024
0536332
sink(ticdc): use admin statement to query async ddl status (#11535)
CharlesCheung96 Sep 2, 2024
17dd162
update
wk989898 Sep 5, 2024
3d41f03
chore
wk989898 Sep 6, 2024
3402020
update
wk989898 Sep 6, 2024
fd8d907
.
wk989898 Sep 6, 2024
4a5d301
test(ticdc,dm): modify download-integration-test-binaries script (#11…
wk989898 Sep 5, 2024
059bd0f
update
wk989898 Sep 6, 2024
ed6cf61
add test
wk989898 Sep 6, 2024
8ab796c
fix test
wk989898 Sep 6, 2024
d91e22d
chore
wk989898 Sep 7, 2024
a483fc2
fix
wk989898 Sep 8, 2024
d8ff62b
.
wk989898 Sep 8, 2024
d195be6
lint
wk989898 Sep 8, 2024
0054111
debug
wk989898 Sep 9, 2024
b43a678
revert
wk989898 Sep 9, 2024
0f38fa8
revert
wk989898 Sep 9, 2024
cae2309
update sync_diff_inspector_url
wk989898 Sep 11, 2024
f072df3
update test
wk989898 Sep 11, 2024
d70369d
test(ticdc): fix data inconsistence on integration_tests (#11584)
wk989898 Sep 11, 2024
c134bc0
update
wk989898 Sep 11, 2024
4b3bb17
set HasVectorType default false
wk989898 Sep 12, 2024
c157b20
chore: update print log
wk989898 Sep 12, 2024
d97adf5
fix test
wk989898 Sep 12, 2024
15ddca3
Revert "sink(ticdc): use admin statement to query async ddl status (#…
wk989898 Sep 20, 2024
8fb9173
remove unused code
wk989898 Sep 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type
byteSize := (ft.GetFlen() + 7) >> 3
datum.SetUint64(0)
datum.SetMysqlBit(types.NewBinaryLiteralFromUint(val, byteSize))
case mysql.TypeTiDBVectorFloat32:
datum.SetVectorFloat32(types.ZeroVectorFloat32)
return datum, nil
}
return datum, nil
}
3 changes: 3 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,9 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) (
}
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), warn, nil
case mysql.TypeTiDBVectorFloat32:
b := datum.GetVectorFloat32()
return b, b.Len(), "", nil
default:
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
Expand Down
78 changes: 53 additions & 25 deletions cdc/sink/ddlsink/mysql/async_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@ package mysql

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/dumpling/export"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

const timeout = 5 * time.Second
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Use the flollowing SQL to check the ddl job status after tidb optimize
// the information_schema.ddl_jobs table. Ref: https://github.com/pingcap/tidb/issues/55725
//
// SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
// FROM information_schema.ddl_jobs
var checkRunningAddIndexSQL = `
SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
FROM information_schema.ddl_jobs
ADMIN SHOW DDL JOBS 1
WHERE DB_NAME = "%s"
AND TABLE_NAME = "%s"
AND JOB_TYPE LIKE "add index%%"
AND (STATE = "running" OR STATE = "queueing")
LIMIT 1;
AND (STATE = "running" OR STATE = "queueing");
`

func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool {
Expand Down Expand Up @@ -92,9 +96,23 @@ func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error {
}
}

func (m *DDLSink) needWaitAsyncExecDone(t timodel.ActionType) bool {
if !m.cfg.IsTiDB {
return false
}
switch t {
case timodel.ActionCreateTable, timodel.ActionCreateTables:
return false
case timodel.ActionCreateSchema:
return false
default:
return true
}
}

// Should always wait for async ddl done before executing the next ddl.
func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
if !m.cfg.IsTiDB {
if !m.needWaitAsyncExecDone(ddl.Type) {
return
}

Expand All @@ -105,16 +123,17 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
if ddl.PreTableInfo != nil {
tables[ddl.PreTableInfo.TableName] = struct{}{}
}
if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) {
return
}

log.Debug("wait async exec ddl done",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Any("tables", tables),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) {
return
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
Expand All @@ -131,6 +150,8 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
}

func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for table := range tables {
done := m.doCheck(ctx, table)
if !done {
Expand All @@ -141,6 +162,7 @@ func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.Ta
}

func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) {
start := time.Now()
if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok {
ddlType := v.(timodel.ActionType)
if ddlType == timodel.ActionAddIndex {
Expand All @@ -152,35 +174,41 @@ func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool
return true
}

ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
if ret.Err() != nil {
rows, err := m.db.QueryContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
defer func() {
if rows != nil {
_ = rows.Err()
}
}()
if err != nil {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(ret.Err()))
zap.Error(err))
return true
}
var jobID, jobType, schemaState, schemaID, tableID, state, query string
if err := ret.Scan(&jobID, &jobType, &schemaState, &schemaID, &tableID, &state, &query); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
}
rets, err := export.GetSpecifiedColumnValuesAndClose(rows, "JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "STATE")
if err != nil {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
return true
}

if len(rets) == 0 {
return true
}
ret := rets[0]
jobID, jobType, schemaState, state := ret[0], ret[1], ret[2], ret[3]
log.Info("async ddl is still running",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Duration("checkDuration", time.Since(start)),
zap.String("table", table.String()),
zap.String("jobID", jobID),
zap.String("jobType", jobType),
zap.String("schemaState", schemaState),
zap.String("schemaID", schemaID),
zap.String("tableID", tableID),
zap.String("state", state),
zap.String("query", query))
zap.String("state", state))
return false
}
28 changes: 26 additions & 2 deletions cdc/sink/ddlsink/mysql/async_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ func TestWaitAsynExecDone(t *testing.T) {

// Case 1: there is a running add index job
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows(
sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}).
AddRow("1", "add index", "running", "1", "1", "running", "Create index idx1 on test.sbtest0(a)"),
sqlmock.NewRows([]string{
"JOB_ID", "DB_NAME", "TABLE_NAME", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID",
"ROW_COUNT", "CREATE_TIME", "START_TIME", "END_TIME", "STATE",
}).AddRow(
1, "test", "sbtest0", "add index", "write reorganization", 1, 1, 0, time.Now(), nil, time.Now(), "running",
).AddRow(
2, "test", "sbtest0", "add index", "write reorganization", 1, 1, 0, time.Now(), time.Now(), time.Now(), "queueing",
),
)
// Case 2: there is no running add index job
// Case 3: no permission to query ddl_jobs, TiDB will return empty result
Expand Down Expand Up @@ -157,3 +163,21 @@ func TestAsyncExecAddIndex(t *testing.T) {
require.True(t, time.Since(start) >= 10*time.Second)
sink.Close()
}

func TestNeedWaitAsyncExecDone(t *testing.T) {
sink := &DDLSink{
cfg: &pmysql.Config{
IsTiDB: false,
},
}
require.False(t, sink.needWaitAsyncExecDone(timodel.ActionTruncateTable))

sink.cfg.IsTiDB = true
require.True(t, sink.needWaitAsyncExecDone(timodel.ActionTruncateTable))
require.True(t, sink.needWaitAsyncExecDone(timodel.ActionDropTable))
require.True(t, sink.needWaitAsyncExecDone(timodel.ActionDropIndex))

require.False(t, sink.needWaitAsyncExecDone(timodel.ActionCreateTable))
require.False(t, sink.needWaitAsyncExecDone(timodel.ActionCreateTables))
require.False(t, sink.needWaitAsyncExecDone(timodel.ActionCreateSchema))
}
62 changes: 62 additions & 0 deletions cdc/sink/ddlsink/mysql/format_ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"bytes"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"go.uber.org/zap"
)

type visiter struct{}
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved

func (f *visiter) Enter(n ast.Node) (node ast.Node, skipChildren bool) {
switch v := n.(type) {
case *ast.ColumnDef:
switch v.Tp.GetType() {
case mysql.TypeTiDBVectorFloat32:
v.Tp.SetType(mysql.TypeLongBlob)
v.Tp.SetCharset("")
v.Tp.SetCollate("")
v.Tp.SetFlen(-1)
v.Options = []*ast.ColumnOption{} // clear COMMENT
}
}
return n, false
}

func (f *visiter) Leave(n ast.Node) (node ast.Node, ok bool) {
return n, true
}

func formatQuery(sql string) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only rewrite the DDL ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the vector format is similar to string in DML.

p := parser.New()
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
log.Error("format query parse one stmt failed", zap.Error(err))
}
stmt.Accept(&visiter{})

buf := new(bytes.Buffer)
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
if err = stmt.Restore(restoreCtx); err != nil {
log.Error("format query restore failed", zap.Error(err))
}
return buf.String()
}
43 changes: 43 additions & 0 deletions cdc/sink/ddlsink/mysql/format_ddl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"bytes"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestFormatQuery(t *testing.T) {
sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` VECTOR(5));"
expectSql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)"
p := parser.New()
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
log.Error("format query parse one stmt failed", zap.Error(err))
}
stmt.Accept(&visiter{})

buf := new(bytes.Buffer)
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
if err = stmt.Restore(restoreCtx); err != nil {
log.Error("format query restore failed", zap.Error(err))
}
require.Equal(t, buf.String(), expectSql)
}
Loading