Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix changefeed checkpoint lag negative value error (#3013) #3533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
Expand All @@ -54,10 +55,11 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer

cancel context.CancelFunc

Expand Down Expand Up @@ -98,6 +100,12 @@ func (c *Capture) reset(ctx context.Context) error {
}
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -146,11 +154,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TimeAcquirer: c.TimeAcquirer,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -164,7 +173,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(3)
wg.Add(4)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -186,6 +195,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
Expand Down
13 changes: 6 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -178,7 +177,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
return errors.Trace(err)
}
if shouldUpdateState {
c.updateStatus(barrierTs)
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
c.updateStatus(currentTs, barrierTs)
}
return nil
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(barrierTs model.Ts) {
func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
resolvedTs := barrierTs
for _, position := range c.state.TaskPositions {
if resolvedTs > position.ResolvedTs {
Expand Down Expand Up @@ -470,12 +471,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) {
}
return status, changed, nil
})

phyTs := oracle.ExtractPhysical(checkpointTs)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
// It is more accurate to get tso from PD, but in most cases since we have
// deployed NTP service, a little bias is acceptable here.
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
}

func (c *changefeed) Close() {
Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -216,6 +217,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)
if err := m.handleCommand(); err != nil {
return state, err
}

captureID := ctx.GlobalVars().CaptureInfo.ID
var inactiveChangefeedCount int
for changefeedID, changefeedState := range globalState.Changefeeds {
Expand Down
27 changes: 14 additions & 13 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if !p.checkChangefeedNormal() {
return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs()
}
if skip := p.checkPosition(); skip {
// we should skip this tick after create a task position
if p.createTaskPosition() {
return p.changefeed, nil
}
if err := p.handleErrorCh(ctx); err != nil {
Expand All @@ -176,7 +177,11 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if err := p.checkTablesNum(ctx); err != nil {
return nil, errors.Trace(err)
}
p.handlePosition()
// it is no need to check the err here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()

p.handlePosition(oracle.GetPhysical(pdTime))
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage()
Expand All @@ -194,10 +199,10 @@ func (p *processor) checkChangefeedNormal() bool {
return true
}

// checkPosition create a new task position, and put it into the etcd state.
// task position maybe be not exist only when the processor is running first time.
func (p *processor) checkPosition() (skipThisTick bool) {
if p.changefeed.TaskPositions[p.captureInfo.ID] != nil {
// createTaskPosition will create a new task position if a task position does not exist.
// task position not exist only when the processor is running first in the first tick.
func (p *processor) createTaskPosition() (skipThisTick bool) {
if _, exist := p.changefeed.TaskPositions[p.captureInfo.ID]; exist {
return false
}
if p.initialized {
Expand Down Expand Up @@ -559,7 +564,7 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error {
}

// handlePosition calculates the local resolved ts and local checkpoint ts
func (p *processor) handlePosition() {
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
Expand All @@ -580,15 +585,11 @@ func (p *processor) handlePosition() {
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-resolvedPhyTs) / 1e3)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-checkpointPhyTs) / 1e3)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))

// minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts.
Expand Down
16 changes: 9 additions & 7 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"log"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
pd "github.com/tikv/pd/client"
Expand All @@ -33,11 +33,12 @@ import (
// the lifecycle of vars in the GlobalVars shoule be aligned with the ticdc server process.
// All field in Vars should be READ-ONLY and THREAD-SAFE
type GlobalVars struct {
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *kv.CDCEtcdClient
GrpcPool kv.GrpcPool
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *kv.CDCEtcdClient
GrpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
}

// ChangefeedVars contains some vars which can be used anywhere in a pipeline
Expand Down Expand Up @@ -184,6 +185,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
6 changes: 4 additions & 2 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -223,7 +224,6 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
defer func() {
_ = cli.Unwrap().Close()
}()

_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
c.Check(err, check.IsNil)

Expand Down Expand Up @@ -272,7 +272,9 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
}

err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded || errors.Cause(err) == context.Canceled) {
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
return
}
c.Check(err, check.IsNil)
Expand Down
Loading