From a99bbbc2c42c02d32433dd6c08388b045c8dcc0b Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Fri, 19 Nov 2021 00:49:48 +0800 Subject: [PATCH 1/5] This is an automated cherry-pick of #3013 Signed-off-by: ti-chi-bot --- cdc/capture/capture.go | 35 +++++++- cdc/owner/changefeed.go | 13 ++- cdc/owner/changefeed_test.go | 2 + cdc/processor/manager.go | 1 + cdc/processor/processor.go | 32 +++++--- pkg/context/context.go | 17 ++++ pkg/orchestrator/etcd_worker_test.go | 6 +- pkg/pdtime/acquirer.go | 118 +++++++++++++++++++++++++++ pkg/pdtime/acquirer_test.go | 53 ++++++++++++ pkg/pdtime/main_test.go | 24 ++++++ pkg/txnutil/gc/gc_manager.go | 33 +++----- pkg/txnutil/gc/gc_manager_test.go | 45 ++++------ 12 files changed, 304 insertions(+), 75 deletions(-) create mode 100644 pkg/pdtime/acquirer.go create mode 100644 pkg/pdtime/acquirer_test.go create mode 100644 pkg/pdtime/main_test.go diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index e9877414271..f76286bac5f 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -32,6 +32,7 @@ import ( cdcContext "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/version" tidbkv "github.com/pingcap/tidb/kv" pd "github.com/tikv/pd/client" @@ -54,10 +55,18 @@ type Capture struct { session *concurrency.Session election *concurrency.Election +<<<<<<< HEAD pdClient pd.Client kvStorage tidbkv.Storage etcdClient *kv.CDCEtcdClient grpcPool kv.GrpcPool +======= + pdClient pd.Client + kvStorage tidbkv.Storage + etcdClient *etcd.CDCEtcdClient + grpcPool kv.GrpcPool + TimeAcquirer pdtime.TimeAcquirer +>>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) cancel context.CancelFunc @@ -97,7 +106,17 @@ func (c *Capture) reset(ctx context.Context) error { return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") } c.session = sess +<<<<<<< HEAD c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey) +======= + c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey) + + if c.TimeAcquirer != nil { + c.TimeAcquirer.Stop() + } + c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient) + +>>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) if c.grpcPool != nil { c.grpcPool.Close() } @@ -146,11 +165,21 @@ func (c *Capture) Run(ctx context.Context) error { func (c *Capture) run(stdCtx context.Context) error { ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ +<<<<<<< HEAD PDClient: c.pdClient, KVStorage: c.kvStorage, CaptureInfo: c.info, EtcdClient: c.etcdClient, GrpcPool: c.grpcPool, +======= + PDClient: c.pdClient, + KVStorage: c.kvStorage, + CaptureInfo: c.info, + EtcdClient: c.etcdClient, + GrpcPool: c.grpcPool, + TimeAcquirer: c.TimeAcquirer, + TableActorSystem: c.tableActorSystem, +>>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) }) err := c.register(ctx) if err != nil { @@ -164,7 +193,7 @@ func (c *Capture) run(stdCtx context.Context) error { cancel() }() wg := new(sync.WaitGroup) - wg.Add(3) + wg.Add(4) var ownerErr, processorErr error go func() { defer wg.Done() @@ -186,6 +215,10 @@ func (c *Capture) run(stdCtx context.Context) error { processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval) log.Info("the processor routine has exited", zap.Error(processorErr)) }() + go func() { + defer wg.Done() + c.TimeAcquirer.Run(ctx) + }() go func() { defer wg.Done() c.grpcPool.RecycleConn(ctx) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 3074e548b77..eef7a303236 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -16,7 +16,6 @@ package owner import ( "context" "sync" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -178,7 +177,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor return errors.Trace(err) } if shouldUpdateState { - c.updateStatus(barrierTs) + pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + currentTs := oracle.GetPhysical(pdTime) + c.updateStatus(currentTs, barrierTs) } return nil } @@ -438,7 +439,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don return done, nil } -func (c *changefeed) updateStatus(barrierTs model.Ts) { +func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) { resolvedTs := barrierTs for _, position := range c.state.TaskPositions { if resolvedTs > position.ResolvedTs { @@ -470,12 +471,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) { } return status, changed, nil }) - phyTs := oracle.ExtractPhysical(checkpointTs) + c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs)) - // It is more accurate to get tso from PD, but in most cases since we have - // deployed NTP service, a little bias is acceptable here. - c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3) + c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3) } func (c *changefeed) Close() { diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index e909125ef51..b44267c66ce 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/txnutil/gc" "github.com/pingcap/ticdc/pkg/util/testleak" "github.com/pingcap/ticdc/pkg/version" @@ -216,6 +217,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, + TimeAcquirer: pdtime.NewTimeAcquirer4Test(), }) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: "changefeed-id-test", diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 3d14e4c19c4..8ab7291784c 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -82,6 +82,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState) if err := m.handleCommand(); err != nil { return state, err } + captureID := ctx.GlobalVars().CaptureInfo.ID var inactiveChangefeedCount int for changefeedID, changefeedState := range globalState.Changefeeds { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 037690c0204..8fc2cf4c33b 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -161,7 +161,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS if !p.checkChangefeedNormal() { return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs() } - if skip := p.checkPosition(); skip { + // we should skip this tick after create a task position + if p.createTaskPosition() { return p.changefeed, nil } if err := p.handleErrorCh(ctx); err != nil { @@ -176,7 +177,18 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS if err := p.checkTablesNum(ctx); err != nil { return nil, errors.Trace(err) } +<<<<<<< HEAD p.handlePosition() +======= + if err := p.flushRedoLogMeta(ctx); err != nil { + return nil, err + } + // it is no need to check the err here, because we will use + // local time when an error return, which is acceptable + pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + + p.handlePosition(oracle.GetPhysical(pdTime)) +>>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) p.pushResolvedTs2Table() p.handleWorkload() p.doGCSchemaStorage() @@ -194,10 +206,10 @@ func (p *processor) checkChangefeedNormal() bool { return true } -// checkPosition create a new task position, and put it into the etcd state. -// task position maybe be not exist only when the processor is running first time. -func (p *processor) checkPosition() (skipThisTick bool) { - if p.changefeed.TaskPositions[p.captureInfo.ID] != nil { +// createTaskPosition will create a new task position if a task position does not exist. +// task position not exist only when the processor is running first in the first tick. +func (p *processor) createTaskPosition() (skipThisTick bool) { + if _, exist := p.changefeed.TaskPositions[p.captureInfo.ID]; exist { return false } if p.initialized { @@ -559,7 +571,7 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error { } // handlePosition calculates the local resolved ts and local checkpoint ts -func (p *processor) handlePosition() { +func (p *processor) handlePosition(currentTs int64) { minResolvedTs := uint64(math.MaxUint64) if p.schemaStorage != nil { minResolvedTs = p.schemaStorage.ResolvedTs() @@ -580,15 +592,11 @@ func (p *processor) handlePosition() { } resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs) - // It is more accurate to get tso from PD, but in most cases we have - // deployed NTP service, a little bias is acceptable here. - p.metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-resolvedPhyTs) / 1e3) + p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3) p.metricResolvedTsGauge.Set(float64(resolvedPhyTs)) checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs) - // It is more accurate to get tso from PD, but in most cases we have - // deployed NTP service, a little bias is acceptable here. - p.metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-checkpointPhyTs) / 1e3) + p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3) p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs)) // minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts. diff --git a/pkg/context/context.go b/pkg/context/context.go index 880bc082f70..8db99b40e68 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -23,6 +23,12 @@ import ( "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" +<<<<<<< HEAD +======= + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/pdtime" + "github.com/pingcap/ticdc/pkg/version" +>>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" @@ -33,11 +39,21 @@ import ( // the lifecycle of vars in the GlobalVars shoule be aligned with the ticdc server process. // All field in Vars should be READ-ONLY and THREAD-SAFE type GlobalVars struct { +<<<<<<< HEAD PDClient pd.Client KVStorage tidbkv.Storage CaptureInfo *model.CaptureInfo EtcdClient *kv.CDCEtcdClient GrpcPool kv.GrpcPool +======= + PDClient pd.Client + KVStorage tidbkv.Storage + CaptureInfo *model.CaptureInfo + EtcdClient *etcd.CDCEtcdClient + GrpcPool kv.GrpcPool + TimeAcquirer pdtime.TimeAcquirer + TableActorSystem *system.System +>>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) } // ChangefeedVars contains some vars which can be used anywhere in a pipeline @@ -184,6 +200,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, + TimeAcquirer: pdtime.NewTimeAcquirer4Test(), }) if withChangefeedVars { ctx = WithChangefeedVars(ctx, &ChangefeedVars{ diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 72cced6004f..ad0711f386d 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "regexp" "strconv" + "strings" "sync" "testing" "time" @@ -223,7 +224,6 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { defer func() { _ = cli.Unwrap().Close() }() - _, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0") c.Check(err, check.IsNil) @@ -272,7 +272,9 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { } err = errg.Wait() - if err != nil && (errors.Cause(err) == context.DeadlineExceeded || errors.Cause(err) == context.Canceled) { + if err != nil && (errors.Cause(err) == context.DeadlineExceeded || + errors.Cause(err) == context.Canceled || + strings.Contains(err.Error(), "etcdserver: request timeout")) { return } c.Check(err, check.IsNil) diff --git a/pkg/pdtime/acquirer.go b/pkg/pdtime/acquirer.go new file mode 100644 index 00000000000..3fae739fa9d --- /dev/null +++ b/pkg/pdtime/acquirer.go @@ -0,0 +1,118 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdtime + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/retry" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const pdTimeUpdateInterval = 200 * time.Millisecond + +type TimeAcquirer interface { + // Run run the TimeAcquirer + Run(ctx context.Context) + // CurrentTimeFromCached returns current time from cache + CurrentTimeFromCached() (time.Time, error) + // Stop stops the TimeAcquirer + Stop() +} + +// TimeAcquirerImpl cache time get from PD periodically and cache it +type TimeAcquirerImpl struct { + pdClient pd.Client + timeCache time.Time + mu sync.RWMutex + cancel context.CancelFunc + err error +} + +// NewTimeAcquirer return a new TimeAcquirer +func NewTimeAcquirer(pdClient pd.Client) TimeAcquirer { + return &TimeAcquirerImpl{ + pdClient: pdClient, + } +} + +// Run will get time from pd periodically to cache in pdPhysicalTimeCache +func (c *TimeAcquirerImpl) Run(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + ticker := time.NewTicker(pdTimeUpdateInterval) + for { + select { + // c.Stop() was called or parent ctx was canceled + case <-ctx.Done(): + log.Info("TimeAcquirer exit") + return + case <-ticker.C: + err := retry.Do(ctx, func() error { + physical, _, err := c.pdClient.GetTS(ctx) + if err != nil { + log.Info("get time from pd failed, retry later", zap.Error(err)) + return err + } + c.mu.Lock() + c.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + c.err = nil + c.mu.Unlock() + return nil + }, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10)) + if err != nil { + log.Warn("get time from pd failed, will use local time as pd time") + c.mu.Lock() + c.timeCache = time.Now() + c.err = err + c.mu.Unlock() + } + } + } +} + +// CurrentTimeFromCached return current time from pd cache +func (c *TimeAcquirerImpl) CurrentTimeFromCached() (time.Time, error) { + c.mu.RLock() + err := c.err + cacheTime := c.timeCache + c.mu.RUnlock() + return cacheTime, errors.Trace(err) +} + +func (c *TimeAcquirerImpl) Stop() { + c.cancel() +} + +type TimeAcquirer4Test struct{} + +func NewTimeAcquirer4Test() TimeAcquirer { + return &TimeAcquirer4Test{} +} + +func (c *TimeAcquirer4Test) CurrentTimeFromCached() (time.Time, error) { + return time.Now(), nil +} + +func (c *TimeAcquirer4Test) Run(ctx context.Context) { +} + +func (c *TimeAcquirer4Test) Stop() { +} diff --git a/pkg/pdtime/acquirer_test.go b/pkg/pdtime/acquirer_test.go new file mode 100644 index 00000000000..55b2950192e --- /dev/null +++ b/pkg/pdtime/acquirer_test.go @@ -0,0 +1,53 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdtime + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" +) + +// MockPDClient mocks pd.Client to facilitate unit testing. +type MockPDClient struct { + pd.Client +} + +// GetTS implements pd.Client.GetTS. +func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return oracle.GetPhysical(time.Now()), 0, nil +} + +func TestTimeFromPD(t *testing.T) { + t.Parallel() + mockPDClient := &MockPDClient{} + TimeAcquirer := NewTimeAcquirer(mockPDClient) + go TimeAcquirer.Run(context.Background()) + defer TimeAcquirer.Stop() + time.Sleep(1 * time.Second) + + t1, err := TimeAcquirer.CurrentTimeFromCached() + require.Nil(t, err) + + time.Sleep(400 * time.Millisecond) + // assume that the gc safe point updated one hour ago + t2, err := TimeAcquirer.CurrentTimeFromCached() + require.Nil(t, err) + // should return new time + require.NotEqual(t, t1, t2) +} diff --git a/pkg/pdtime/main_test.go b/pkg/pdtime/main_test.go new file mode 100644 index 00000000000..229d3e567f5 --- /dev/null +++ b/pkg/pdtime/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdtime + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 16a8fd6c5e8..2c55463f4f4 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" + cdcContext "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" @@ -31,7 +32,6 @@ import ( const ( // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. CDCServiceSafePointID = "ticdc" - pdTimeUpdateInterval = 10 * time.Minute ) // gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint @@ -43,32 +43,27 @@ type Manager interface { // Manager may skip update when it thinks it is too frequent. // Set `forceUpdate` to force Manager update. TryUpdateGCSafePoint(ctx context.Context, checkpointTs model.Ts, forceUpdate bool) error - CurrentTimeFromPDCached(ctx context.Context) (time.Time, error) CheckStaleCheckpointTs(ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error } type gcManager struct { pdClient pd.Client - - gcTTL int64 + gcTTL int64 lastUpdatedTime time.Time lastSucceededTime time.Time lastSafePointTs uint64 isTiCDCBlockGC bool - - pdPhysicalTimeCache time.Time - lastUpdatedPdTime time.Time } // NewManager creates a new Manager. -func NewManager(pdClint pd.Client) Manager { +func NewManager(pdClient pd.Client) Manager { serverConfig := config.GetGlobalServerConfig() failpoint.Inject("InjectGcSafepointUpdateInterval", func(val failpoint.Value) { gcSafepointUpdateInterval = time.Duration(val.(int) * int(time.Millisecond)) }) return &gcManager{ - pdClient: pdClint, + pdClient: pdClient, lastSucceededTime: time.Now(), gcTTL: serverConfig.GcTTL, } @@ -111,25 +106,17 @@ func (m *gcManager) TryUpdateGCSafePoint( return nil } -func (m *gcManager) CurrentTimeFromPDCached(ctx context.Context) (time.Time, error) { - if time.Since(m.lastUpdatedPdTime) <= pdTimeUpdateInterval { - return m.pdPhysicalTimeCache, nil - } - physical, logical, err := m.pdClient.GetTS(ctx) - if err != nil { - return time.Now(), errors.Trace(err) - } - m.pdPhysicalTimeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, logical)) - m.lastUpdatedPdTime = time.Now() - return m.pdPhysicalTimeCache, nil -} - func (m *gcManager) CheckStaleCheckpointTs( ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts, ) error { gcSafepointUpperBound := checkpointTs - 1 if m.isTiCDCBlockGC { - pdTime, err := m.CurrentTimeFromPDCached(ctx) + cctx, ok := ctx.(cdcContext.Context) + if !ok { + return cerror.ErrOwnerUnknown.GenWithStack("ctx not an cdcContext.Context, it should be") + } + pdTime, err := cctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + // TODO: should we return err here, or just log it? if err != nil { return errors.Trace(err) } diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 3fa2c09e1a2..3423b58855a 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/pingcap/ticdc/pkg/pdtime" + "github.com/pingcap/check" "github.com/pingcap/errors" cdcContext "github.com/pingcap/ticdc/pkg/context" @@ -87,49 +89,32 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { } } -func (s *gcManagerSuite) TestTimeFromPD(c *check.C) { - defer testleak.AfterTest(c)() - mockPDClient := &MockPDClient{} - gcManager := NewManager(mockPDClient).(*gcManager) - ctx := cdcContext.NewBackendContext4Test(true) - ctx.GlobalVars().PDClient = mockPDClient - t1, err := gcManager.CurrentTimeFromPDCached(ctx) - c.Assert(err, check.IsNil) - c.Assert(t1, check.Equals, gcManager.pdPhysicalTimeCache) - - time.Sleep(50 * time.Millisecond) - // should return cached time - t2, err := gcManager.CurrentTimeFromPDCached(ctx) - c.Assert(err, check.IsNil) - c.Assert(t2, check.Equals, gcManager.pdPhysicalTimeCache) - c.Assert(t2, check.Equals, t1) - - time.Sleep(50 * time.Millisecond) - // assume that the gc safe point updated one hour ago - gcManager.lastUpdatedPdTime = time.Now().Add(-time.Hour) - t3, err := gcManager.CurrentTimeFromPDCached(ctx) - c.Assert(err, check.IsNil) - c.Assert(t3, check.Equals, gcManager.pdPhysicalTimeCache) - // should return new time - c.Assert(t3, check.Not(check.Equals), t2) -} - func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} gcManager := NewManager(mockPDClient).(*gcManager) gcManager.isTiCDCBlockGC = true ctx := context.Background() - err := gcManager.CheckStaleCheckpointTs(ctx, "cfID", 10) + + TimeAcquirer := pdtime.NewTimeAcquirer(mockPDClient) + go TimeAcquirer.Run(ctx) + time.Sleep(1 * time.Second) + defer TimeAcquirer.Stop() + + cCtx := cdcContext.NewContext(ctx, &cdcContext.GlobalVars{ + TimeAcquirer: TimeAcquirer, + }) + + err := gcManager.CheckStaleCheckpointTs(cCtx, "cfID", 10) c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue) c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue) - err = gcManager.CheckStaleCheckpointTs(ctx, "cfID", oracle.GoTimeToTS(time.Now())) + err = gcManager.CheckStaleCheckpointTs(cCtx, "cfID", oracle.GoTimeToTS(time.Now())) c.Assert(err, check.IsNil) gcManager.isTiCDCBlockGC = false gcManager.lastSafePointTs = 20 - err = gcManager.CheckStaleCheckpointTs(ctx, "cfID", 10) + err = gcManager.CheckStaleCheckpointTs(cCtx, "cfID", 10) c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue) c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue) } From 06acfca39e2995a1aff370d9b028e03b5151d723 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 18:04:17 +0800 Subject: [PATCH 2/5] *: resolves conflict --- cdc/capture/capture.go | 34 +++++++--------------------------- cdc/processor/processor.go | 7 ------- pkg/context/context.go | 27 ++++++--------------------- 3 files changed, 13 insertions(+), 55 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index f76286bac5f..d44088cbda3 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -55,18 +55,11 @@ type Capture struct { session *concurrency.Session election *concurrency.Election -<<<<<<< HEAD - pdClient pd.Client - kvStorage tidbkv.Storage - etcdClient *kv.CDCEtcdClient - grpcPool kv.GrpcPool -======= pdClient pd.Client kvStorage tidbkv.Storage - etcdClient *etcd.CDCEtcdClient + etcdClient *kv.CDCEtcdClient grpcPool kv.GrpcPool TimeAcquirer pdtime.TimeAcquirer ->>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) cancel context.CancelFunc @@ -106,17 +99,13 @@ func (c *Capture) reset(ctx context.Context) error { return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") } c.session = sess -<<<<<<< HEAD c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey) -======= - c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey) if c.TimeAcquirer != nil { c.TimeAcquirer.Stop() } c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient) ->>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) if c.grpcPool != nil { c.grpcPool.Close() } @@ -165,21 +154,12 @@ func (c *Capture) Run(ctx context.Context) error { func (c *Capture) run(stdCtx context.Context) error { ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ -<<<<<<< HEAD - PDClient: c.pdClient, - KVStorage: c.kvStorage, - CaptureInfo: c.info, - EtcdClient: c.etcdClient, - GrpcPool: c.grpcPool, -======= - PDClient: c.pdClient, - KVStorage: c.kvStorage, - CaptureInfo: c.info, - EtcdClient: c.etcdClient, - GrpcPool: c.grpcPool, - TimeAcquirer: c.TimeAcquirer, - TableActorSystem: c.tableActorSystem, ->>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) + PDClient: c.pdClient, + KVStorage: c.kvStorage, + CaptureInfo: c.info, + EtcdClient: c.etcdClient, + GrpcPool: c.grpcPool, + TimeAcquirer: c.TimeAcquirer, }) err := c.register(ctx) if err != nil { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8fc2cf4c33b..15268a01ac6 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -177,18 +177,11 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS if err := p.checkTablesNum(ctx); err != nil { return nil, errors.Trace(err) } -<<<<<<< HEAD - p.handlePosition() -======= - if err := p.flushRedoLogMeta(ctx); err != nil { - return nil, err - } // it is no need to check the err here, because we will use // local time when an error return, which is acceptable pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() p.handlePosition(oracle.GetPhysical(pdTime)) ->>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) p.pushResolvedTs2Table() p.handleWorkload() p.doGCSchemaStorage() diff --git a/pkg/context/context.go b/pkg/context/context.go index 8db99b40e68..7d00c9c00ba 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -18,17 +18,11 @@ import ( "log" "time" - "github.com/pingcap/ticdc/pkg/version" - "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" -<<<<<<< HEAD -======= - "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/version" ->>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" @@ -39,21 +33,12 @@ import ( // the lifecycle of vars in the GlobalVars shoule be aligned with the ticdc server process. // All field in Vars should be READ-ONLY and THREAD-SAFE type GlobalVars struct { -<<<<<<< HEAD - PDClient pd.Client - KVStorage tidbkv.Storage - CaptureInfo *model.CaptureInfo - EtcdClient *kv.CDCEtcdClient - GrpcPool kv.GrpcPool -======= - PDClient pd.Client - KVStorage tidbkv.Storage - CaptureInfo *model.CaptureInfo - EtcdClient *etcd.CDCEtcdClient - GrpcPool kv.GrpcPool - TimeAcquirer pdtime.TimeAcquirer - TableActorSystem *system.System ->>>>>>> c91af794e (*: fix changefeed checkpoint lag negative value error (#3013)) + PDClient pd.Client + KVStorage tidbkv.Storage + CaptureInfo *model.CaptureInfo + EtcdClient *kv.CDCEtcdClient + GrpcPool kv.GrpcPool + TimeAcquirer pdtime.TimeAcquirer } // ChangefeedVars contains some vars which can be used anywhere in a pipeline From be1d8a81be42fcfb7615b59c507dfd76ca6f6757 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 19:53:13 +0800 Subject: [PATCH 3/5] acquirer_test: fix error --- pkg/pdtime/acquirer_test.go | 22 +++++++++++++++------- pkg/pdtime/main_test.go | 24 ------------------------ 2 files changed, 15 insertions(+), 31 deletions(-) delete mode 100644 pkg/pdtime/main_test.go diff --git a/pkg/pdtime/acquirer_test.go b/pkg/pdtime/acquirer_test.go index 55b2950192e..8e9e89fcc31 100644 --- a/pkg/pdtime/acquirer_test.go +++ b/pkg/pdtime/acquirer_test.go @@ -18,11 +18,19 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/oracle" + "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/util/testleak" + + "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" ) +func TestSuite(t *testing.T) { check.TestingT(t) } + +type pdTimeSuite struct{} + +var _ = check.Suite(&pdTimeSuite{}) + // MockPDClient mocks pd.Client to facilitate unit testing. type MockPDClient struct { pd.Client @@ -33,8 +41,8 @@ func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) { return oracle.GetPhysical(time.Now()), 0, nil } -func TestTimeFromPD(t *testing.T) { - t.Parallel() +func (s *pdTimeSuite) TestTimeFromPD(c *check.C) { + defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} TimeAcquirer := NewTimeAcquirer(mockPDClient) go TimeAcquirer.Run(context.Background()) @@ -42,12 +50,12 @@ func TestTimeFromPD(t *testing.T) { time.Sleep(1 * time.Second) t1, err := TimeAcquirer.CurrentTimeFromCached() - require.Nil(t, err) + c.Assert(err, check.IsNil) time.Sleep(400 * time.Millisecond) // assume that the gc safe point updated one hour ago t2, err := TimeAcquirer.CurrentTimeFromCached() - require.Nil(t, err) + c.Assert(err, check.IsNil) // should return new time - require.NotEqual(t, t1, t2) + c.Assert(t1, check.Less, t2) } diff --git a/pkg/pdtime/main_test.go b/pkg/pdtime/main_test.go deleted file mode 100644 index 229d3e567f5..00000000000 --- a/pkg/pdtime/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package pdtime - -import ( - "testing" - - "github.com/pingcap/ticdc/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} From 326d788f1d042af158b0833ff87294372cdaf2e3 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 20:20:56 +0800 Subject: [PATCH 4/5] acquirer: fix error --- pkg/pdtime/acquirer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/pdtime/acquirer.go b/pkg/pdtime/acquirer.go index 3fae739fa9d..49b1fda81c6 100644 --- a/pkg/pdtime/acquirer.go +++ b/pkg/pdtime/acquirer.go @@ -21,13 +21,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/retry" - "github.com/tikv/client-go/v2/oracle" + "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) const pdTimeUpdateInterval = 200 * time.Millisecond +// TimeAcquirer cache time get from PD periodically type TimeAcquirer interface { // Run run the TimeAcquirer Run(ctx context.Context) @@ -37,7 +38,7 @@ type TimeAcquirer interface { Stop() } -// TimeAcquirerImpl cache time get from PD periodically and cache it +// TimeAcquirerImpl cache time get from PD periodically type TimeAcquirerImpl struct { pdClient pd.Client timeCache time.Time @@ -72,6 +73,7 @@ func (c *TimeAcquirerImpl) Run(ctx context.Context) { return err } c.mu.Lock() + c.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) c.err = nil c.mu.Unlock() @@ -97,22 +99,28 @@ func (c *TimeAcquirerImpl) CurrentTimeFromCached() (time.Time, error) { return cacheTime, errors.Trace(err) } +// Stop stop TimeAcquirer func (c *TimeAcquirerImpl) Stop() { c.cancel() } +// TimeAcquirer4Test only for test type TimeAcquirer4Test struct{} +// NewTimeAcquirer4Test return a TimeAcquirer for test func NewTimeAcquirer4Test() TimeAcquirer { return &TimeAcquirer4Test{} } +// CurrentTimeFromCached return current time func (c *TimeAcquirer4Test) CurrentTimeFromCached() (time.Time, error) { return time.Now(), nil } +// Run implements TimeAcquirer func (c *TimeAcquirer4Test) Run(ctx context.Context) { } +// Stop implements TimeAcquirer func (c *TimeAcquirer4Test) Stop() { } From 83bf5288586089184db321615b59ee358cb608ba Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 20:41:54 +0800 Subject: [PATCH 5/5] gc_manager_test: fix error --- pkg/pdtime/acquirer_test.go | 1 - pkg/txnutil/gc/gc_manager_test.go | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/pdtime/acquirer_test.go b/pkg/pdtime/acquirer_test.go index 8e9e89fcc31..c769934c304 100644 --- a/pkg/pdtime/acquirer_test.go +++ b/pkg/pdtime/acquirer_test.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/check" "github.com/pingcap/ticdc/pkg/util/testleak" - "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" ) diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 3423b58855a..92ad0c23bb4 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -18,12 +18,11 @@ import ( "testing" "time" - "github.com/pingcap/ticdc/pkg/pdtime" - "github.com/pingcap/check" "github.com/pingcap/errors" cdcContext "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/util/testleak" "github.com/pingcap/tidb/store/tikv/oracle" )