Skip to content

Commit

Permalink
Merge branch 'release-5.0' into clean-changefeed-metric
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 16, 2021
2 parents ef6be3d + e94f5f5 commit 1129bd3
Show file tree
Hide file tree
Showing 26 changed files with 729 additions and 219 deletions.
12 changes: 7 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ build: cdc

build-failpoint:
$(FAILPOINT_ENABLE)
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go
$(FAILPOINT_DISABLE)

cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go

kafka_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./kafka_consumer/main.go
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go

install:
go install ./...
Expand Down Expand Up @@ -126,9 +126,9 @@ integration_test_build: check_failpoint_ctl
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc \
-o bin/cdc.test github.com/pingcap/ticdc/cmd/cdc \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go \
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)

Expand Down Expand Up @@ -193,6 +193,8 @@ data-flow-diagram: docs/data-flow.dot
clean:
go clean -i ./...
rm -rf *.out
rm -f bin/cdc
rm -f bin/cdc_kafka_consumer

tools/bin/gofumports: tools/check/go.mod
cd tools/check; test -e ../bin/gofumports || \
Expand Down
1 change: 1 addition & 0 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (c *Capture) Run(ctx context.Context) (err error) {
// 1/3 of session ttl that `session.Done` can't be triggered even
// the lease is already revoked.
if cerror.ErrEtcdSessionDone.Equal(err) {
log.Warn("session is disconnected", zap.Error(err))
return cerror.ErrCaptureSuicide.GenWithStackByArgs()
}
lease, inErr := c.etcdClient.Client.TimeToLive(ctx, c.session.Lease())
Expand Down
1 change: 1 addition & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Rea
switch {
case cerror.ErrEtcdSessionDone.Equal(err),
cerror.ErrLeaseExpired.Equal(err):
log.Warn("session is disconnected", zap.Error(err))
return cerror.ErrCaptureSuicide.GenWithStackByArgs()
}
lease, inErr := ctx.GlobalVars().EtcdClient.Client.TimeToLive(ctx, c.session.Lease())
Expand Down
2 changes: 1 addition & 1 deletion cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (c *changeFeed) updatePartition(tblInfo *timodel.TableInfo, startTs uint64)
newPartitionIDs := make([]int64, 0, len(pi.Definitions))
for _, partition := range pi.Definitions {
pid := partition.ID
_, ok := c.orphanTables[pid]
_, ok := oldIDs[pid]
if !ok {
// new partition.
c.orphanTables[pid] = startTs
Expand Down
37 changes: 37 additions & 0 deletions cdc/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/check"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/etcd"
Expand Down Expand Up @@ -128,3 +129,39 @@ func (s *changefeedSuite) TestHandleMoveTableJobs(c *check.C) {
c.Assert(changefeed.orphanTables, check.HasKey, model.TableID(1))
c.Assert(changefeed.moveTableJobs, check.HasLen, 0)
}

func (s *changefeedSuite) TestUpdatePartition(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

cf := changeFeed{
partitions: map[model.TableID][]int64{
51: {53, 55, 57},
},
orphanTables: make(map[model.TableID]model.Ts),
toCleanTables: make(map[model.TableID]model.Ts),
}
tblInfo := &timodel.TableInfo{
ID: 51,
Partition: &timodel.PartitionInfo{
Enable: true,
Definitions: []timodel.PartitionDefinition{
{ID: 57}, {ID: 59}, {ID: 61},
},
},
}
startTs := uint64(100)

cf.updatePartition(tblInfo, startTs)
c.Assert(cf.orphanTables, check.DeepEquals, map[model.TableID]model.Ts{
59: startTs,
61: startTs,
})
c.Assert(cf.toCleanTables, check.DeepEquals, map[model.TableID]model.Ts{
53: startTs,
55: startTs,
})
c.Assert(cf.partitions, check.DeepEquals, map[model.TableID][]int64{
51: {57, 59, 61},
})
}
2 changes: 2 additions & 0 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
ctx := context.Background()
detail := &model.ChangeFeedInfo{
SinkURI: "root@tcp(127.0.0.1:3306)/mysql",
SortDir: "/old-version/sorter",
}
cfID := "test-op-cf"

Expand All @@ -236,6 +237,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
d, err := s.client.GetChangeFeedInfo(ctx, cfID)
c.Assert(err, check.IsNil)
c.Assert(d.SinkURI, check.Equals, detail.SinkURI)
c.Assert(d.SortDir, check.Equals, detail.SortDir)

err = s.client.LeaseGuardDeleteChangeFeedInfo(ctx, cfID, sess.Lease())
c.Assert(err, check.IsNil)
Expand Down
24 changes: 23 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ const (
StateFinished FeedState = "finished"
)

// ToInt return a int for each `FeedState`, only use this for metrics.
func (s FeedState) ToInt() int {
switch s {
case StateNormal:
return 0
case StateError:
return 1
case StateFailed:
return 2
case StateStopped:
return 3
case StateFinished:
return 4
case StateRemoved:
return 5
}
// -1 for unknown feed state
return -1
}

const (
// errorHistoryGCInterval represents how long we keep error record in changefeed info
errorHistoryGCInterval = time.Minute * 10
Expand All @@ -78,7 +98,9 @@ type ChangeFeedInfo struct {
AdminJobType AdminJobType `json:"admin-job-type"`
Engine SortEngine `json:"sort-engine"`
// SortDir is deprecated
SortDir string `json:"-"`
// it cannot be set by user in changefeed level, any assignment to it should be ignored.
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
},
StartTs: 417136892416622595,
Engine: "memory",
SortDir: ".",
Config: &config.ReplicaConfig{
CaseSensitive: true,
Filter: &config.FilterConfig{
Expand Down
7 changes: 3 additions & 4 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type ownership struct {
tickTime time.Duration
}

func newOwnersip(tickTime time.Duration) ownership {
func newOwnership(tickTime time.Duration) ownership {
minTickTime := 5 * time.Second
if tickTime > minTickTime {
log.Panic("ownership counter must be incearsed every 5 seconds")
Expand Down Expand Up @@ -1112,7 +1112,6 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}

err = o.dispatchJob(ctx, job)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1278,7 +1277,7 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {
defer feedChangeReceiver.Stop()
o.watchFeedChange(ctx1)

ownership := newOwnersip(tickTime)
ownership := newOwnership(tickTime)
loop:
for {
select {
Expand Down Expand Up @@ -1580,7 +1579,7 @@ func (o *Owner) watchCapture(ctx context.Context) error {
failpoint.Inject("sleep-before-watch-capture", nil)

// When an owner just starts, changefeed information is not updated at once.
// Supposing a crased capture should be removed now, the owner will miss deleting
// Supposing a crashed capture should be removed now, the owner will miss deleting
// task status and task position if changefeed information is not loaded.
// If the task positions and status decode failed, remove them.
if err := o.checkAndCleanTasksInfo(ctx); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ var (
Name: "maintain_table_num",
Help: "number of replicated tables maintained in owner",
}, []string{"changefeed", "capture", "type"})
changefeedStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "status",
Help: "The status of changefeeds",
}, []string{"changefeed"})
)

const (
Expand All @@ -59,4 +66,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsLagGauge)
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
}
4 changes: 4 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) {
o.lastTickTime = now

ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()
for changefeedID, changefeedState := range state.Changefeeds {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
Expand All @@ -243,6 +244,9 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) {
}
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation)))
if changefeedState.Info != nil {
changefeedStatusGauge.WithLabelValues(changefeedID).Set(float64(changefeedState.Info.State.ToInt()))
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {
if err != nil {
log.Warn("Cannot remove temporary file for sorting", zap.String("file", backEnd.fileName), zap.Error(err))
} else {
log.Info("Temporary file removed", zap.String("file", backEnd.fileName))
log.Debug("Temporary file removed", zap.String("file", backEnd.fileName))
freedCount += 1
}
if freedCount >= 16 {
Expand Down Expand Up @@ -371,7 +371,7 @@ func (p *backEndPool) cleanUpStaleFiles() error {
}

for _, toRemoveFilePath := range files {
log.Info("Removing stale sorter temporary file", zap.String("file", toRemoveFilePath))
log.Debug("Removing stale sorter temporary file", zap.String("file", toRemoveFilePath))
err := os.Remove(toRemoveFilePath)
if err != nil {
// In production, we do not want an error here to interfere with normal operation,
Expand Down
2 changes: 1 addition & 1 deletion cdc/puller/sorter/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
pendingSet := &sync.Map{}

defer func() {
log.Info("Unified Sorter: merger exiting, cleaning up resources")
log.Debug("Unified Sorter: merger exiting, cleaning up resources")
// cancel pending async IO operations.
onExit()
cleanUpTask := func(task *flushTask) {
Expand Down
25 changes: 17 additions & 8 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,8 @@ func (s *Server) initDataDir(ctx context.Context) error {
return errors.Trace(err)
}

if diskInfo.Avail < dataDirThreshold {
log.Warn(fmt.Sprintf("%s is set as data-dir (%dGB available), ticdc recommend disk for data-dir "+
"at least have %dGB available space", conf.DataDir, diskInfo.Avail, dataDirThreshold))
}
log.Info(fmt.Sprintf("%s is set as data-dir (%dGB available), ticdc recommend disk for data-dir "+
"at least have %dGB available space", conf.DataDir, diskInfo.Avail, dataDirThreshold))

return nil
}
Expand Down Expand Up @@ -440,14 +438,25 @@ func (s *Server) setUpDataDir(ctx context.Context) error {
// at the moment, only consider available disk space
func findBestDataDir(candidates []string) (result string, ok bool) {
var low uint64 = 0
for _, dir := range candidates {

checker := func(dir string) (*util.DiskInfo, error) {
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
if err := util.IsDirReadWritable(dir); err != nil {
log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err))
continue
return nil, err
}
info, err := util.GetDiskInfo(dir)
if err != nil {
log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err))
return nil, err
}
return info, err
}

for _, dir := range candidates {
info, err := checker(dir)
if err != nil {
log.Warn("check the availability of dir", zap.String("dir", dir), zap.Error(err))
continue
}
if info.Avail > low {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,11 @@ error = '''
pulsar send message failed
'''

["CDC:ErrReachMaxTry"]
error = '''
reach maximum try: %d
'''

["CDC:ErrReactorFinished"]
error = '''
the reactor has done its job and should no longer be executed
Expand Down
Loading

0 comments on commit 1129bd3

Please sign in to comment.