Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

just for run tests #392

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0f4b892
add metrics
Mar 28, 2020
da05fdc
add metrics
Mar 28, 2020
93dcc9b
put changedfeed
Mar 28, 2020
491d8f4
put changedfeed
Mar 28, 2020
3986fb6
put changedfeed
Mar 28, 2020
713d3f9
put changedfeed
Mar 28, 2020
0561898
linshi
Mar 28, 2020
3837ed4
add log
Mar 28, 2020
32ab246
add log
Mar 28, 2020
cac3c28
skip resolvedts
Mar 28, 2020
7667a98
add log
Mar 28, 2020
e8eef45
update
Mar 28, 2020
f1f2809
update
Mar 28, 2020
997b6dd
Merge remote-tracking branch 'pingcap/master' into fix_sorted
Mar 29, 2020
d39a891
Merge remote-tracking branch 'origin/add_metrics' into fix_sorted
Mar 29, 2020
a6018c2
Merge remote-tracking branch 'origin/add_metrics' into fix_sorted
Mar 29, 2020
5b01582
Merge remote-tracking branch 'origin/add_metrics' into fix_sorted
Mar 29, 2020
6f8df38
increase chan size, fix grafana template
amyangfei Mar 29, 2020
0cd560d
Merge remote-tracking branch 'origin/add_metrics' into fix_sorted
Mar 29, 2020
6c77ef8
Merge remote-tracking branch 'yangfei/chan-metrics-2' into fix_sorted
Mar 29, 2020
0a09b08
skip mounter
Mar 29, 2020
8b158f0
skip mounter
Mar 29, 2020
b3d7676
skip mounter
Mar 29, 2020
32594f9
Merge remote-tracking branch 'pingcap/master' into fix_sorted
Mar 29, 2020
7c13ff9
skip sink
Mar 29, 2020
2dcfb06
fix panic
Mar 29, 2020
0f69cf9
update
Mar 29, 2020
6106982
update
Mar 29, 2020
2999b76
update
Mar 29, 2020
dbb338f
update
Mar 29, 2020
ad0b309
update
Mar 29, 2020
89218c8
update
Mar 29, 2020
26c4b35
update
Mar 29, 2020
bb573d3
update
Mar 30, 2020
de84829
update
Mar 30, 2020
95822ee
update
Mar 30, 2020
d8e1fd1
update
Mar 30, 2020
42c89c7
update
Mar 30, 2020
0ca75f6
update
Mar 30, 2020
66ec126
update
Mar 30, 2020
39b8899
Merge remote-tracking branch 'pingcap/master' into fix_sorted
Mar 30, 2020
6eab389
update
Mar 30, 2020
ecb8181
Merge remote-tracking branch 'origin/fix_sorted' into faster_sorter
Mar 30, 2020
ee61f61
split async
Mar 30, 2020
9b113de
split async
Mar 30, 2020
7e1b6bd
split async
Mar 30, 2020
30f8e10
gzip
Mar 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions cdc/entry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,24 @@

package entry

import (
"github.com/prometheus/client_golang/prometheus"
)
import "github.com/prometheus/client_golang/prometheus"

var (
tableMountedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "entry",
Name: "table_mounted_resolved_ts",
Help: "real local resolved ts of processor",
}, []string{"changefeed", "capture", "table"})
ddlResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "entry",
Name: "table_ddl_resolved_ts",
Help: "real local resolved ts of processor",
}, []string{"changefeed", "capture"})

mounterOutputChanSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -29,5 +42,7 @@ var (

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ddlResolvedTsGauge)
registry.MustRegister(tableMountedResolvedTsGauge)
registry.MustRegister(mounterOutputChanSizeGauge)
}
148 changes: 120 additions & 28 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"math"
"strconv"
"time"

"golang.org/x/sync/errgroup"

"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,6 +82,7 @@ func (idx *indexKVEntry) unflatten(tableInfo *TableInfo) error {
}
return nil
}

func isDistinct(index *timodel.IndexInfo, indexValue []types.Datum) bool {
if index.Primary {
return true
Expand All @@ -102,6 +108,7 @@ type mounterImpl struct {
schemaStorage *Storage
rawRowChangedCh <-chan *model.RawKVEntry
output chan *model.RowChangedEvent
unmarshalRow chan *model.RawKVEntry
}

// NewMounter creates a mounter
Expand All @@ -110,43 +117,124 @@ func NewMounter(rawRowChangedCh <-chan *model.RawKVEntry, schemaStorage *Storage
schemaStorage: schemaStorage,
rawRowChangedCh: rawRowChangedCh,
output: make(chan *model.RowChangedEvent, defaultOutputChanSize),
unmarshalRow: make(chan *model.RawKVEntry, defaultOutputChanSize),
}
}

func (m *mounterImpl) Run(ctx context.Context) error {
go func() {
m.collectMetrics(ctx)
}()
captureID := util.CaptureIDFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
tableID := util.TableIDFromCtx(ctx)

for {
var rawRow *model.RawKVEntry
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case rawRow = <-m.rawRowChangedCh:
}
if rawRow == nil {
continue
}
errg, cctx := errgroup.WithContext(ctx)

if err := m.schemaStorage.HandlePreviousDDLJobIfNeed(rawRow.Ts); err != nil {
return errors.Trace(err)
}
errg.Go(func() error {
m.collectMetrics(cctx)
return nil
})

if rawRow.OpType == model.OpTypeResolved {
m.output <- &model.RowChangedEvent{Resolved: true, Ts: rawRow.Ts}
continue
}
errg.Go(func() error {
for {
var rawRow *model.RawKVEntry
select {
case <-cctx.Done():
return errors.Trace(cctx.Err())
case rawRow = <-m.rawRowChangedCh:
}
if rawRow == nil {
continue
}

if err := m.schemaStorage.HandlePreviousDDLJobIfNeed(rawRow.Ts); err != nil {
return errors.Trace(err)
}

if rawRow.OpType == model.OpTypeResolved {
tableMountedResolvedTsGauge.WithLabelValues(changefeedID, captureID, strconv.FormatInt(tableID, 10)).Set(float64(oracle.ExtractPhysical(rawRow.Ts)))
}
m.unmarshalRow <- rawRow

event, err := m.unmarshalAndMountRowChanged(rawRow)
if err != nil {
return errors.Trace(err)
}
if event == nil {
continue
}
m.output <- event
})

errg.Go(func() error {
return m.unmarshalWorker(cctx)
})
return errg.Wait()
}

const workerNum = 4

func (m *mounterImpl) unmarshalWorker(ctx context.Context) error {
errg, cctx := errgroup.WithContext(ctx)
eventChs := make([]chan *model.RowChangedEvent, workerNum)

for i := 0; i < workerNum; i++ {
i := i
eventChs[i] = make(chan *model.RowChangedEvent, defaultOutputChanSize)
errg.Go(func() error {
var rawRow *model.RawKVEntry
for {
select {
case <-cctx.Done():
return errors.Trace(cctx.Err())
case rawRow = <-m.unmarshalRow:
}
if rawRow.OpType == model.OpTypeResolved {
for j := 0; j < workerNum; j++ {
eventChs[j] <- &model.RowChangedEvent{Ts: rawRow.Ts, Resolved: true}
}
continue
}
event, err := m.unmarshalAndMountRowChanged(rawRow)
if err != nil {
return errors.Trace(err)
}
if event == nil {
continue
}
eventChs[i] <- event
}
})
}

errg.Go(func() error {
defer func() {
for i := 0; i < workerNum; i++ {
close(eventChs[i])
}
}()
events := make([]*model.RowChangedEvent, workerNum)
var lastResolvedTs uint64
for {
minTs := uint64(math.MaxUint64)
var minChIndex int
for i := 0; i < workerNum; i++ {
if events[i] == nil {
select {
case events[i] = <-eventChs[i]:
case <-cctx.Done():
return errors.Trace(cctx.Err())
}
}
if minTs > events[i].Ts {
minTs = events[i].Ts
minChIndex = i
}
}
if events[minChIndex].Resolved {
if events[minChIndex].Ts != lastResolvedTs {
m.output <- events[minChIndex]
lastResolvedTs = events[minChIndex].Ts
}
} else {
m.output <- events[minChIndex]
}
events[minChIndex] = nil
minTs = uint64(math.MaxUint64)
}
})
return errg.Wait()
}

func (m *mounterImpl) Output() <-chan *model.RowChangedEvent {
Expand All @@ -167,6 +255,7 @@ func (m *mounterImpl) collectMetrics(ctx context.Context) {
}
}()
}

func (m *mounterImpl) unmarshalAndMountRowChanged(raw *model.RawKVEntry) (*model.RowChangedEvent, error) {
if !bytes.HasPrefix(raw.Key, tablePrefix) {
return nil, nil
Expand Down Expand Up @@ -404,7 +493,10 @@ func formatColVal(value interface{}, tp byte) (interface{}, error) {
}

switch tp {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeDecimal, mysql.TypeNewDecimal, mysql.TypeJSON:
case mysql.TypeDate, mysql.TypeDatetime,
mysql.TypeNewDate, mysql.TypeTimestamp,
mysql.TypeDuration, mysql.TypeDecimal,
mysql.TypeNewDecimal, mysql.TypeJSON:
value = fmt.Sprintf("%v", value)
case mysql.TypeEnum:
value = value.(types.Enum).Value
Expand Down
8 changes: 8 additions & 0 deletions cdc/entry/schema_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"sync"
"sync/atomic"

"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"

"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
Expand Down Expand Up @@ -128,6 +131,9 @@ func NewStorageBuilder(historyDDL []*timodel.Job, ddlEventCh <-chan *model.RawKV

// Run runs the StorageBuilder
func (b *StorageBuilder) Run(ctx context.Context) error {

captureID := util.CaptureIDFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
for {
var rawKV *model.RawKVEntry
select {
Expand All @@ -143,6 +149,8 @@ func (b *StorageBuilder) Run(ctx context.Context) error {
}

if rawKV.OpType == model.OpTypeResolved {
ddlResolvedTsGauge.WithLabelValues(changefeedID, captureID).Set(float64(oracle.ExtractPhysical(rawKV.Ts)))

atomic.StoreUint64(&b.resolvedTs, rawKV.Ts)
continue
}
Expand Down
1 change: 1 addition & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func WrapTableInfo(info *timodel.TableInfo) *TableInfo {
if mysql.HasPriKeyFlag(col.Flag) {
// Prepend to make sure the primary key ends up at the front
ti.UniqueColumns[col.ID] = struct{}{}
ti.IndieMarkCol = col.Name.O
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (a *connArray) Init(ctx context.Context) error {
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32*1024*1024)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(128*1024*1024)),
)
cancel()

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ddlHandler struct {
func newDDLHandler(pdCli pd.Client, checkpointTS uint64) *ddlHandler {
// The key in DDL kv pair returned from TiKV is already memcompariable encoded,
// so we set `needEncode` to false.
puller := puller.NewPuller(pdCli, checkpointTS, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, nil)
puller := puller.NewPuller(pdCli, checkpointTS, 0, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, nil)
ctx, cancel := context.WithCancel(context.Background())
h := &ddlHandler{
puller: puller,
Expand Down
5 changes: 3 additions & 2 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func newProcessor(

// The key in DDL kv pair returned from TiKV is already memcompariable encoded,
// so we set `needEncode` to false.
ddlPuller := puller.NewPuller(pdCli, checkpointTs, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, limitter)
ddlPuller := puller.NewPuller(pdCli, checkpointTs, 0, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, limitter)
ddlEventCh := ddlPuller.SortedOutput(ctx)
schemaBuilder, err := createSchemaBuilder(endpoints, ddlEventCh)
if err != nil {
Expand Down Expand Up @@ -565,6 +565,7 @@ func createTsRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (stor
func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64) {
p.tablesMu.Lock()
defer p.tablesMu.Unlock()
ctx = util.PutTableIDInCtx(ctx, tableID)

log.Debug("Add table", zap.Int64("tableID", tableID))
if _, ok := p.tables[tableID]; ok {
Expand All @@ -582,7 +583,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
// The key in DML kv pair returned from TiKV is not memcompariable encoded,
// so we set `needEncode` to true.
span := util.GetTableSpan(tableID, true)
puller := puller.NewPuller(p.pdCli, startTs, []util.Span{span}, true, p.limitter)
puller := puller.NewPuller(p.pdCli, startTs, tableID, []util.Span{span}, true, p.limitter)
go func() {
err := puller.Run(ctx)
if errors.Cause(err) != context.Canceled {
Expand Down
Loading