diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 6a5d2ece4fb..3f1a2841f1b 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -79,6 +79,10 @@ type changeFeed struct { id string info *model.ChangeFeedInfo status *model.ChangeFeedStatus + // The latest checkpointTs already applied to Etcd. + // We need to check this field to ensure visibility to the processors, + // if the operation assumes the progress of the global checkpoint. + appliedCheckpointTs uint64 schema *entry.SingleSchemaSnapshot ddlState model.ChangeFeedDDLState @@ -292,6 +296,11 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model return nil } + // Do NOT rebalance orphan tables before checkpoint ts has advanced to FinishTs of a DDL + if c.appliedCheckpointTs != c.status.CheckpointTs { + return nil + } + captureIDs := make(map[model.CaptureID]struct{}, len(captures)) cleanedTables := make(map[model.TableID]struct{}) addedTables := make(map[model.TableID]struct{}) @@ -532,9 +541,20 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model job.Status = model.MoveTableStatusDeleted log.Info("handle the move job, remove table from the source capture", zap.Reflect("job", job)) case model.MoveTableStatusDeleted: + // Do NOT dispatch tables before checkpoint ts has been flushed to Etcd. + if c.appliedCheckpointTs != c.status.CheckpointTs { + log.Debug("handle the move job, waiting for checkpoint ts to be uploaded", + zap.Uint64("applied-checkpoint-ts", c.appliedCheckpointTs), + zap.Uint64("latest-checkpoint-ts", c.status.CheckpointTs)) + continue + } + // add table to target capture status, exist := cloneStatus(job.To) replicaInfo := job.TableReplicaInfo.Clone() + if replicaInfo.StartTs < c.status.CheckpointTs { + replicaInfo.StartTs = c.status.CheckpointTs + } if !exist { // the target capture is not exist, add table to orphanTables. c.orphanTables[tableID] = replicaInfo.StartTs @@ -614,6 +634,7 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C if c.ddlState != model.ChangeFeedWaitToExecDDL { return nil } + if len(c.ddlJobHistory) == 0 { log.Panic("ddl job history can not be empty in changefeed when should to execute DDL") } @@ -624,14 +645,21 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C return nil } - if c.status.CheckpointTs != todoDDLJob.BinlogInfo.FinishedTS { + if c.appliedCheckpointTs < todoDDLJob.BinlogInfo.FinishedTS-1 { log.Debug("wait checkpoint ts", zap.Uint64("checkpoint ts", c.status.CheckpointTs), + zap.Uint64("applied checkpoint ts", c.appliedCheckpointTs), zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS), zap.String("ddl query", todoDDLJob.Query)) return nil } + if c.appliedCheckpointTs >= todoDDLJob.BinlogInfo.FinishedTS { + log.Panic("applied checkpoint ts is larger than DDL finish ts", + zap.Uint64("applied checkpoint ts", c.appliedCheckpointTs), + zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS)) + } + log.Info("apply job", zap.Stringer("job", todoDDLJob), zap.String("schema", todoDDLJob.SchemaName), zap.String("query", todoDDLJob.Query), @@ -669,10 +697,6 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C return nil } - err = c.balanceOrphanTables(ctx, captures) - if err != nil { - return errors.Trace(err) - } executed := false if !c.cyclicEnabled || c.info.Config.Cyclic.SyncDDL { failpoint.Inject("InjectChangefeedDDLError", func() { @@ -867,6 +891,10 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { c.ddlTs = minResolvedTs } + if len(c.ddlJobHistory) > 0 && minCheckpointTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS { + minCheckpointTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS - 1 + } + // if downstream sink is the MQ sink, the MQ sink do not promise that checkpoint is less than globalResolvedTs if minCheckpointTs > minResolvedTs { minCheckpointTs = minResolvedTs diff --git a/cdc/owner.go b/cdc/owner.go index 74858e85933..e3416ba1bd7 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -132,6 +132,10 @@ func NewOwner( cli := kv.NewCDCEtcdClient(ctx, sess.Client()) endpoints := sess.Client().Endpoints() + failpoint.Inject("ownerFlushIntervalInject", func(val failpoint.Value) { + flushChangefeedInterval = time.Millisecond * time.Duration(val.(int)) + }) + owner := &Owner{ done: make(chan struct{}), session: sess, @@ -187,8 +191,15 @@ func (o *Owner) removeCapture(info *model.CaptureInfo) { startTs = feed.status.CheckpointTs } - for tableID := range task.Tables { + for tableID, replicaInfo := range task.Tables { feed.orphanTables[tableID] = startTs + if startTs < replicaInfo.StartTs { + log.Warn("table startTs not consistent", + zap.Uint64("table-start-ts", replicaInfo.StartTs), + zap.Uint64("checkpoint-ts", startTs), + zap.Reflect("status", feed.status)) + feed.orphanTables[tableID] = replicaInfo.StartTs + } } ctx := context.TODO() @@ -412,23 +423,24 @@ func (o *Owner) newChangeFeed( ResolvedTs: 0, CheckpointTs: checkpointTs, }, - scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp), - ddlState: model.ChangeFeedSyncDML, - ddlExecutedTs: checkpointTs, - targetTs: info.GetTargetTs(), - ddlTs: 0, - updateResolvedTs: true, - startTimer: make(chan bool), - syncpointStore: syncpointStore, - syncCancel: nil, - taskStatus: processorsInfos, - taskPositions: taskPositions, - etcdCli: o.etcdClient, - filter: filter, - sink: primarySink, - cyclicEnabled: info.Config.Cyclic.IsEnabled(), - lastRebalanceTime: time.Now(), - cancel: cancel, + appliedCheckpointTs: checkpointTs, + scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp), + ddlState: model.ChangeFeedSyncDML, + ddlExecutedTs: checkpointTs, + targetTs: info.GetTargetTs(), + ddlTs: 0, + updateResolvedTs: true, + startTimer: make(chan bool), + syncpointStore: syncpointStore, + syncCancel: nil, + taskStatus: processorsInfos, + taskPositions: taskPositions, + etcdCli: o.etcdClient, + filter: filter, + sink: primarySink, + cyclicEnabled: info.Config.Cyclic.IsEnabled(), + lastRebalanceTime: time.Now(), + cancel: cancel, } return cf, nil } @@ -672,8 +684,8 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds)) for id, changefeed := range o.changeFeeds { snapshot[id] = changefeed.status - if changefeed.status.CheckpointTs < minCheckpointTs { - minCheckpointTs = changefeed.status.CheckpointTs + if changefeed.appliedCheckpointTs < minCheckpointTs { + minCheckpointTs = changefeed.appliedCheckpointTs } phyTs := oracle.ExtractPhysical(changefeed.status.CheckpointTs) @@ -687,6 +699,9 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if err != nil { return errors.Trace(err) } + for id, changefeedStatus := range snapshot { + o.changeFeeds[id].appliedCheckpointTs = changefeedStatus.CheckpointTs + } o.lastFlushChangefeeds = time.Now() } } @@ -1187,26 +1202,28 @@ func (o *Owner) run(ctx context.Context) error { return errors.Trace(err) } - err = o.calcResolvedTs(ctx) + err = o.handleDDL(ctx) if err != nil { return errors.Trace(err) } - err = o.handleDDL(ctx) + err = o.handleSyncPoint(ctx) if err != nil { return errors.Trace(err) } - err = o.handleSyncPoint(ctx) + err = o.handleAdminJob(ctx) if err != nil { return errors.Trace(err) } - err = o.handleAdminJob(ctx) + err = o.calcResolvedTs(ctx) if err != nil { return errors.Trace(err) } + // It is better for flushChangeFeedInfos to follow calcResolvedTs immediately, + // because operations such as handleDDL and rebalancing rely on proper progress of the checkpoint in Etcd. err = o.flushChangeFeedInfos(ctx) if err != nil { return errors.Trace(err) @@ -1323,7 +1340,9 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context) error { for tableID, replicaInfo := range status.Tables { startTs := replicaInfo.StartTs if taskPosFound { - startTs = pos.CheckPointTs + if startTs < pos.CheckPointTs { + startTs = pos.CheckPointTs + } } o.addOrphanTable(changeFeedID, tableID, startTs) } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 5aedf33505a..b6c2476ff3f 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -922,8 +922,8 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) { c.Assert(err, check.IsNil) c.Assert(len(owner.captures), check.Equals, 1) c.Assert(owner.captures, check.HasKey, capture.info.ID) + c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 110}) c.Assert(atomic.LoadInt32(&owner.captureLoaded), check.Equals, int32(1)) - c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 100}) // check stale tasks are cleaned up statuses, err = s.client.GetAllTaskStatus(ctx, changefeed) c.Assert(err, check.IsNil) diff --git a/cdc/processor.go b/cdc/processor.go index 932e2555d11..d934c4d7117 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -225,6 +225,15 @@ func newProcessor( p.status = status p.statusModRevision = modRevision + info, _, err := p.etcdCli.GetChangeFeedStatus(ctx, p.changefeedID) + if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { + return nil, errors.Trace(err) + } + + if err == nil { + p.globalcheckpointTs = info.CheckpointTs + } + for tableID, replicaInfo := range p.status.Tables { p.addTable(ctx, tableID, replicaInfo) } @@ -770,6 +779,8 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) if replicaInfo.StartTs < globalcheckpointTs { + // use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized. + // The cdc_state_checker will catch a real inconsistency in integration tests. log.Warn("addTable: startTs < checkpoint", util.ZapFieldChangefeed(ctx), zap.Int64("tableID", tableID), @@ -1074,6 +1085,11 @@ func (p *processor) sorterConsume( } return } + + if checkpointTs < replicaInfo.StartTs { + checkpointTs = replicaInfo.StartTs + } + if checkpointTs != 0 { atomic.StoreUint64(pCheckpointTs, checkpointTs) p.localCheckpointTsNotifier.Notify() diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index ed00bd63c14..49c92841eaf 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -136,7 +136,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, timerInterval time.Duration) // We are safe to update the ReactorState only if there is no pending patch. for _, update := range worker.pendingUpdates { - err := worker.state.Update(update.key, update.value) + err := worker.state.Update(update.key, update.value, false) if err != nil { return errors.Trace(err) } @@ -181,7 +181,12 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error { worker.rawState = make(map[util.EtcdKey][]byte) for _, kv := range resp.Kvs { - worker.rawState[util.NewEtcdKeyFromBytes(kv.Key)] = kv.Value + key := util.NewEtcdKeyFromBytes(kv.Key) + worker.rawState[key] = kv.Value + err := worker.state.Update(key, kv.Value, true) + if err != nil { + return errors.Trace(err) + } } worker.revision = resp.Header.Revision diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 22e8e4e353c..f4ca6e2b8be 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -146,7 +146,7 @@ func (s *simpleReactorState) SetSum(sum int) { s.patches = append(s.patches, patch) } -func (s *simpleReactorState) Update(key util.EtcdKey, value []byte) error { +func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { subMatches := keyParseRegexp.FindSubmatch(key.Bytes()) if len(subMatches) != 2 { log.Panic("illegal Etcd key", zap.ByteString("key", key.Bytes())) @@ -279,13 +279,13 @@ type intReactorState struct { isUpdated bool } -func (s *intReactorState) Update(key util.EtcdKey, value []byte) error { +func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { var err error s.val, err = strconv.Atoi(string(value)) if err != nil { log.Panic("intReactorState", zap.Error(err)) } - s.isUpdated = true + s.isUpdated = !isInit return nil } diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 8cf89d78067..f0f5290e6f6 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -38,7 +38,7 @@ type DataPatch struct { // ReactorState models the Etcd state of a reactor type ReactorState interface { // Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state. - Update(key util.EtcdKey, value []byte) error + Update(key util.EtcdKey, value []byte, isInit bool) error // GetPatches is called by EtcdWorker, and should return a slice of data patches that represents the changes // that a Reactor wants to apply to Etcd. diff --git a/pkg/orchestrator/jsonstate/json_reactor_state.go b/pkg/orchestrator/jsonstate/json_reactor_state.go index 3e42daa2fe0..d16620b72fc 100644 --- a/pkg/orchestrator/jsonstate/json_reactor_state.go +++ b/pkg/orchestrator/jsonstate/json_reactor_state.go @@ -59,7 +59,7 @@ func NewJSONReactorState(key string, data interface{}) (*JSONReactorState, error } // Update implements the ReactorState interface. -func (s *JSONReactorState) Update(key util.EtcdKey, value []byte) error { +func (s *JSONReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { if key != s.key { return nil } diff --git a/testing_utils/cdc_state_checker/cdc_monitor.go b/testing_utils/cdc_state_checker/cdc_monitor.go new file mode 100644 index 00000000000..ced0bca63c3 --- /dev/null +++ b/testing_utils/cdc_state_checker/cdc_monitor.go @@ -0,0 +1,99 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "time" + + "github.com/pingcap/ticdc/pkg/security" + + "github.com/pingcap/log" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/logutil" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" +) + +type cdcMonitor struct { + etcdCli *etcd.Client + etcdWorker *orchestrator.EtcdWorker + reactor *cdcMonitReactor +} + +func newCDCMonitor(ctx context.Context, pd string, credential *security.Credential) (*cdcMonitor, error) { + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + + grpcCredential, err := credential.ToGRPCDialOption() + if err != nil { + return nil, errors.Trace(err) + } + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{pd}, + TLS: nil, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpcCredential, + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + }, + }) + if err != nil { + return nil, errors.Trace(err) + } + + wrappedCli := etcd.Wrap(etcdCli, map[string]prometheus.Counter{}) + reactor := &cdcMonitReactor{} + initState := newCDCReactorState() + etcdWorker, err := orchestrator.NewEtcdWorker(wrappedCli, kv.EtcdKeyBase, reactor, initState) + if err != nil { + return nil, errors.Trace(err) + } + + ret := &cdcMonitor{ + etcdCli: wrappedCli, + etcdWorker: etcdWorker, + reactor: reactor, + } + + return ret, nil +} + +func (m *cdcMonitor) run(ctx context.Context) error { + log.Debug("start running cdcMonitor") + err := m.etcdWorker.Run(ctx, 200*time.Millisecond) + log.Error("etcdWorker exited: test-case-failed", zap.Error(err)) + log.Info("CDC state", zap.Reflect("state", m.reactor.state)) + return err +} diff --git a/testing_utils/cdc_state_checker/main.go b/testing_utils/cdc_state_checker/main.go new file mode 100644 index 00000000000..b1408f91d19 --- /dev/null +++ b/testing_utils/cdc_state_checker/main.go @@ -0,0 +1,61 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "strings" + + "github.com/pingcap/ticdc/pkg/security" + + "github.com/pingcap/log" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var ( + pd = flag.String("pd", "http://127.0.0.1:2379", "PD address and port") + caPath = flag.String("ca", "", "CA certificate path for TLS connection") + certPath = flag.String("cert", "", "Certificate path for TLS connection") + keyPath = flag.String("key", "", "Private key path for TLS connection") + allowedCertCN = flag.String("cert-allowed-cn", "", "Verify caller's identity "+ + "(cert Common Name). Use `,` to separate multiple CN") +) + +func main() { + flag.Parse() + log.SetLevel(zapcore.DebugLevel) + + cdcMonitor, err := newCDCMonitor(context.TODO(), *pd, getCredential()) + if err != nil { + log.Panic("Error creating CDCMonitor", zap.Error(err)) + } + + err = cdcMonitor.run(context.TODO()) + log.Panic("cdcMonitor exited", zap.Error(err)) +} + +func getCredential() *security.Credential { + var certAllowedCN []string + if len(*allowedCertCN) != 0 { + certAllowedCN = strings.Split(*allowedCertCN, ",") + } + return &security.Credential{ + CAPath: *caPath, + CertPath: *certPath, + KeyPath: *keyPath, + CertAllowedCN: certAllowedCN, + } +} diff --git a/testing_utils/cdc_state_checker/reactor.go b/testing_utils/cdc_state_checker/reactor.go new file mode 100644 index 00000000000..2c0acd8d1e8 --- /dev/null +++ b/testing_utils/cdc_state_checker/reactor.go @@ -0,0 +1,98 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/orchestrator" + "go.uber.org/zap" +) + +type cdcMonitReactor struct { + state *cdcReactorState +} + +func (r *cdcMonitReactor) Tick(_ context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { + r.state = state.(*cdcReactorState) + + err := r.verifyTs() + if err != nil { + log.Error("Verifying Ts failed", zap.Error(err)) + return r.state, err + } + + err = r.verifyStartTs() + if err != nil { + log.Error("Verifying startTs failed", zap.Error(err)) + return r.state, err + } + + return r.state, nil +} + +func (r *cdcMonitReactor) verifyTs() error { + for changfeedID, positions := range r.state.TaskPositions { + status, ok := r.state.ChangefeedStatuses[changfeedID] + if !ok { + log.Warn("changefeed status not found", zap.String("cfid", changfeedID)) + return nil + } + + actualCheckpointTs := status.CheckpointTs + + for captureID, position := range positions { + if _, ok := r.state.Captures[captureID]; !ok { + // ignore positions whose capture is no longer present + continue + } + + if position.CheckPointTs < actualCheckpointTs { + return errors.Errorf("checkpointTs too large, globalCkpt = %d, localCkpt = %d, capture = %s, cfid = %s", + actualCheckpointTs, position.CheckPointTs, captureID, changfeedID) + } + } + } + + return nil +} + +func (r *cdcMonitReactor) verifyStartTs() error { + for changfeedID, statuses := range r.state.TaskStatuses { + cStatus, ok := r.state.ChangefeedStatuses[changfeedID] + if !ok { + log.Warn("changefeed status not found", zap.String("cfid", changfeedID)) + return nil + } + + actualCheckpointTs := cStatus.CheckpointTs + + for captureID, status := range statuses { + for tableID, operation := range status.Operation { + if operation.Status != model.OperFinished && !operation.Delete { + startTs := status.Tables[tableID].StartTs + if startTs < actualCheckpointTs { + return errors.Errorf("startTs too small, globalCkpt = %d, startTs = %d, table = %d, capture = %s, cfid = %s", + actualCheckpointTs, startTs, tableID, captureID, changfeedID) + } + } + } + } + } + + return nil +} diff --git a/testing_utils/cdc_state_checker/state.go b/testing_utils/cdc_state_checker/state.go new file mode 100644 index 00000000000..da7b84f1ac5 --- /dev/null +++ b/testing_utils/cdc_state_checker/state.go @@ -0,0 +1,232 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "regexp" + + "go.uber.org/zap" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/orchestrator/util" +) + +type cdcReactorState struct { + Owner model.CaptureID + Captures map[model.CaptureID]*model.CaptureInfo + ChangefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatus + TaskPositions map[model.ChangeFeedID]map[model.CaptureID]*model.TaskPosition + TaskStatuses map[model.ChangeFeedID]map[model.CaptureID]*model.TaskStatus +} + +var ( + captureRegex = regexp.MustCompile(regexp.QuoteMeta(kv.CaptureInfoKeyPrefix) + "/(.+)") + changefeedRegex = regexp.MustCompile(regexp.QuoteMeta(kv.JobKeyPrefix) + "/(.+)") + positionRegex = regexp.MustCompile(regexp.QuoteMeta(kv.TaskPositionKeyPrefix) + "/(.+?)/(.+)") + statusRegex = regexp.MustCompile(regexp.QuoteMeta(kv.TaskStatusKeyPrefix) + "/(.+?)/(.+)") +) + +func newCDCReactorState() *cdcReactorState { + return &cdcReactorState{ + Captures: make(map[model.CaptureID]*model.CaptureInfo), + ChangefeedStatuses: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), + TaskPositions: make(map[model.ChangeFeedID]map[model.CaptureID]*model.TaskPosition), + TaskStatuses: make(map[model.ChangeFeedID]map[model.CaptureID]*model.TaskStatus), + } +} + +func (s *cdcReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { + if key.String() == kv.CaptureOwnerKey { + if value == nil { + log.Info("Owner lost", zap.String("old-owner", s.Owner)) + return nil + } + + log.Info("Owner updated", zap.String("old-owner", s.Owner), + zap.ByteString("new-owner", value)) + s.Owner = string(value) + return nil + } + + if matches := captureRegex.FindSubmatch(key.Bytes()); matches != nil { + captureID := string(matches[1]) + + if value == nil { + log.Info("Capture deleted", + zap.String("captureID", captureID), + zap.Reflect("old-capture", s.Captures[captureID])) + + delete(s.Captures, captureID) + return nil + } + + var newCaptureInfo model.CaptureInfo + err := json.Unmarshal(value, &newCaptureInfo) + if err != nil { + return errors.Trace(err) + } + + if oldCaptureInfo, ok := s.Captures[captureID]; ok { + log.Info("Capture updated", + zap.String("captureID", captureID), + zap.Reflect("old-capture", oldCaptureInfo), + zap.Reflect("new-capture", newCaptureInfo)) + } else { + log.Info("Capture added", + zap.String("captureID", captureID), + zap.Reflect("new-capture", newCaptureInfo)) + } + + s.Captures[captureID] = &newCaptureInfo + return nil + } + + if matches := changefeedRegex.FindSubmatch(key.Bytes()); matches != nil { + changefeedID := string(matches[1]) + + if value == nil { + log.Info("Changefeed deleted", + zap.String("changefeedID", changefeedID), + zap.Reflect("old-changefeed", s.ChangefeedStatuses)) + + delete(s.ChangefeedStatuses, changefeedID) + return nil + } + + var newChangefeedStatus model.ChangeFeedStatus + err := json.Unmarshal(value, &newChangefeedStatus) + if err != nil { + return errors.Trace(err) + } + + if oldChangefeedInfo, ok := s.ChangefeedStatuses[changefeedID]; ok { + log.Info("Changefeed updated", + zap.String("changefeedID", changefeedID), + zap.Reflect("old-changefeed", oldChangefeedInfo), + zap.Reflect("new-changefeed", newChangefeedStatus)) + } else { + log.Info("Changefeed added", + zap.String("changefeedID", changefeedID), + zap.Reflect("new-changefeed", newChangefeedStatus)) + } + + s.ChangefeedStatuses[changefeedID] = &newChangefeedStatus + + return nil + } + + if matches := positionRegex.FindSubmatch(key.Bytes()); matches != nil { + captureID := string(matches[1]) + changefeedID := string(matches[2]) + + if value == nil { + log.Info("Position deleted", + zap.String("captureID", captureID), + zap.String("changefeedID", changefeedID), + zap.Reflect("old-position", s.TaskPositions[changefeedID][captureID])) + + delete(s.TaskPositions[changefeedID], captureID) + if len(s.TaskPositions[changefeedID]) == 0 { + delete(s.TaskPositions, changefeedID) + } + + return nil + } + + var newTaskPosition model.TaskPosition + err := json.Unmarshal(value, &newTaskPosition) + if err != nil { + return errors.Trace(err) + } + + if _, ok := s.TaskPositions[changefeedID]; !ok { + s.TaskPositions[changefeedID] = make(map[model.CaptureID]*model.TaskPosition) + } + + if position, ok := s.TaskPositions[changefeedID][captureID]; ok { + log.Info("Position updated", + zap.String("captureID", captureID), + zap.String("changefeedID", changefeedID), + zap.Reflect("old-position", position), + zap.Reflect("new-position", newTaskPosition)) + } else { + log.Info("Position created", + zap.String("captureID", captureID), + zap.String("changefeedID", changefeedID), + zap.Reflect("new-position", newTaskPosition)) + } + + s.TaskPositions[changefeedID][captureID] = &newTaskPosition + + return nil + } + + if matches := statusRegex.FindSubmatch(key.Bytes()); matches != nil { + captureID := string(matches[1]) + changefeedID := string(matches[2]) + + if value == nil { + log.Info("Status deleted", + zap.String("captureID", captureID), + zap.String("changefeedID", changefeedID), + zap.Reflect("old-status", s.TaskStatuses[changefeedID][captureID])) + + delete(s.TaskStatuses[changefeedID], captureID) + if len(s.TaskStatuses[changefeedID]) == 0 { + delete(s.TaskStatuses, changefeedID) + } + + return nil + } + + var newTaskStatus model.TaskStatus + err := json.Unmarshal(value, &newTaskStatus) + if err != nil { + return errors.Trace(err) + } + + if _, ok := s.TaskStatuses[changefeedID]; !ok { + s.TaskStatuses[changefeedID] = make(map[model.CaptureID]*model.TaskStatus) + } + + if status, ok := s.TaskStatuses[changefeedID][captureID]; ok { + log.Info("Status updated", + zap.String("captureID", captureID), + zap.String("changefeedID", changefeedID), + zap.Reflect("old-status", status), + zap.Reflect("new-status", newTaskStatus)) + } else { + log.Info("Status updated", + zap.String("captureID", captureID), + zap.String("changefeedID", changefeedID), + zap.Reflect("new-status", newTaskStatus)) + } + + s.TaskStatuses[changefeedID][captureID] = &newTaskStatus + + return nil + } + + log.Debug("Etcd operation ignored", zap.String("key", key.String()), zap.ByteString("value", value)) + return nil +} + +func (s *cdcReactorState) GetPatches() []*orchestrator.DataPatch { + return nil +} diff --git a/tests/_utils/check_cdc_state_log b/tests/_utils/check_cdc_state_log new file mode 100755 index 00000000000..6eafaf12ff0 --- /dev/null +++ b/tests/_utils/check_cdc_state_log @@ -0,0 +1,18 @@ +#!/bin/bash +# parameter 1: work directory +WORK_DIR=$1 + +set +e + +if [ ! -f $WORK_DIR/cdc_etcd_check.log ]; then + exit 0 +fi + +grep -q -i test-case-failed $WORK_DIR/cdc_etcd_check.log + +if [ $? -eq 0 ]; then + echo "cdc state checker failed" + exit 1 +else + exit 0 +fi diff --git a/tests/_utils/start_tidb_cluster_impl b/tests/_utils/start_tidb_cluster_impl index 5a55f4619dc..a5e23b58ad6 100755 --- a/tests/_utils/start_tidb_cluster_impl +++ b/tests/_utils/start_tidb_cluster_impl @@ -319,3 +319,11 @@ while ! curl -o /dev/null -sf http://127.0.0.1:17000/metrics 1>/dev/null 2>&1; d fi sleep 2 done + +echo "Starting CDC state checker..." +cd $CUR/../../testing_utils/cdc_state_checker +if [ ! -f ./cdc_state_checker ]; then + GO111MODULE=on go build +fi +./cdc_state_checker -pd ${UP_PD_HOST_1}:${UP_PD_PEER_PORT_1} > $OUT_DIR/cdc_etcd_check.log & +cd $OUT_DIR diff --git a/tests/_utils/start_tls_tidb_cluster_impl b/tests/_utils/start_tls_tidb_cluster_impl index 9383655c85b..95936ade544 100755 --- a/tests/_utils/start_tls_tidb_cluster_impl +++ b/tests/_utils/start_tls_tidb_cluster_impl @@ -144,3 +144,14 @@ run_sql "update mysql.tidb set variable_value='60m' where variable_name='tikv_gc --ssl-ca=$TLS_DIR/ca.pem \ --ssl-cert=$TLS_DIR/server.pem \ --ssl-key=$TLS_DIR/server-key.pem || echo "tidb_enable_clustered_index not supports" + +echo "Starting CDC state checker..." +cd $CUR/../../testing_utils/cdc_state_checker +if [ ! -f ./cdc_state_checker ]; then + GO111MODULE=on go build +fi +./cdc_state_checker -pd ${TLS_PD_HOST}:${TLS_PD_PORT} \ + -ca $TLS_DIR/ca.pem \ + -cert $TLS_DIR/server.pem \ + -key $TLS_DIR/server-key.pem > $OUT_DIR/cdc_etcd_check.log & +cd $OUT_DIR diff --git a/tests/_utils/stop_tidb_cluster b/tests/_utils/stop_tidb_cluster index a0bc0abc507..be1fb7b21d7 100755 --- a/tests/_utils/stop_tidb_cluster +++ b/tests/_utils/stop_tidb_cluster @@ -7,6 +7,7 @@ killall -w -s 9 cdc || true killall -w -s 9 cdc.test || true killall -w -s 9 tiflash || true killall -w -s 9 flash_cluster_manager || true +killall -w -s 9 cdc_state_checker || true CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $CUR/../_utils/test_prepare diff --git a/tests/autorandom/run.sh b/tests/autorandom/run.sh index 0fa189122c8..0b9958da106 100644 --- a/tests/autorandom/run.sh +++ b/tests/autorandom/run.sh @@ -33,10 +33,10 @@ function run() { # sync_diff can't check non-exist table, so we check expected tables are created in downstream first check_table_exists autorandom_test.table_a ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - cleanup_process $CDC_BINARY } trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/availability/run.sh b/tests/availability/run.sh index 76c0d9c74b5..45da818d51e 100644 --- a/tests/availability/run.sh +++ b/tests/availability/run.sh @@ -35,4 +35,5 @@ prepare $* test_owner_ha $* test_capture_ha $* test_processor_ha $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/batch_add_table/run.sh b/tests/batch_add_table/run.sh index 27dce04f57d..246463deed3 100644 --- a/tests/batch_add_table/run.sh +++ b/tests/batch_add_table/run.sh @@ -42,4 +42,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/capture_session_done_during_task/run.sh b/tests/capture_session_done_during_task/run.sh index 8e00d44dafc..bc63dfa8aa5 100644 --- a/tests/capture_session_done_during_task/run.sh +++ b/tests/capture_session_done_during_task/run.sh @@ -56,4 +56,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cdc/run.sh b/tests/cdc/run.sh index 16139a73799..fd985f70038 100755 --- a/tests/cdc/run.sh +++ b/tests/cdc/run.sh @@ -39,4 +39,5 @@ cd "$(dirname "$0")" set -o pipefail GO111MODULE=on go run cdc.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log cleanup_process $CDC_BINARY +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cdclog_file/run.sh b/tests/cdclog_file/run.sh index 2cd62336789..4170af11535 100644 --- a/tests/cdclog_file/run.sh +++ b/tests/cdclog_file/run.sh @@ -81,4 +81,5 @@ function cdclog_test() { trap stop_tidb_cluster EXIT prepare $* cdclog_test $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cdclog_s3/run.sh b/tests/cdclog_s3/run.sh index d4e5c45ec97..2c33d1fa34d 100644 --- a/tests/cdclog_s3/run.sh +++ b/tests/cdclog_s3/run.sh @@ -113,4 +113,5 @@ function cdclog_test() { trap stop EXIT prepare $* cdclog_test $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/changefeed_auto_stop/run.sh b/tests/changefeed_auto_stop/run.sh index f4d63afde6a..ae26cc39bc0 100755 --- a/tests/changefeed_auto_stop/run.sh +++ b/tests/changefeed_auto_stop/run.sh @@ -71,4 +71,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/changefeed_error/run.sh b/tests/changefeed_error/run.sh index 9513fa2b5c0..122f58a23f8 100755 --- a/tests/changefeed_error/run.sh +++ b/tests/changefeed_error/run.sh @@ -126,4 +126,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/changefeed_finish/run.sh b/tests/changefeed_finish/run.sh index 0711c7148d6..7a1ee371bd9 100755 --- a/tests/changefeed_finish/run.sh +++ b/tests/changefeed_finish/run.sh @@ -69,4 +69,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/changefeed_pause_resume/run.sh b/tests/changefeed_pause_resume/run.sh index 5dcd7a8a3aa..17c849a45e6 100755 --- a/tests/changefeed_pause_resume/run.sh +++ b/tests/changefeed_pause_resume/run.sh @@ -65,4 +65,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/changefeed_reconstruct/run.sh b/tests/changefeed_reconstruct/run.sh index a4137bdf40f..4ce98f01d05 100755 --- a/tests/changefeed_reconstruct/run.sh +++ b/tests/changefeed_reconstruct/run.sh @@ -81,4 +81,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cli/run.sh b/tests/cli/run.sh index 9c2b14e4dc7..34d3aabd8b3 100644 --- a/tests/cli/run.sh +++ b/tests/cli/run.sh @@ -182,4 +182,5 @@ EOF trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/common_1/run.sh b/tests/common_1/run.sh index c92788aee85..fd252a8ba38 100644 --- a/tests/common_1/run.sh +++ b/tests/common_1/run.sh @@ -63,4 +63,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cyclic_ab/run.sh b/tests/cyclic_ab/run.sh index 44922d88db8..de7132731b4 100644 --- a/tests/cyclic_ab/run.sh +++ b/tests/cyclic_ab/run.sh @@ -112,4 +112,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cyclic_abc/run.sh b/tests/cyclic_abc/run.sh index 0eb5e97f25e..2ce291e8bb4 100644 --- a/tests/cyclic_abc/run.sh +++ b/tests/cyclic_abc/run.sh @@ -183,4 +183,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/ddl_puller_lag/run.sh b/tests/ddl_puller_lag/run.sh index 6e22eee3001..568921291d5 100644 --- a/tests/ddl_puller_lag/run.sh +++ b/tests/ddl_puller_lag/run.sh @@ -104,4 +104,5 @@ trap stop_tidb_cluster EXIT prepare $* sleep 180 sql_test $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/ddl_reentrant/run.sh b/tests/ddl_reentrant/run.sh index 9379f4cbc58..33fe92ed978 100644 --- a/tests/ddl_reentrant/run.sh +++ b/tests/ddl_reentrant/run.sh @@ -153,4 +153,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/ddl_sequence/run.sh b/tests/ddl_sequence/run.sh index 6a09106aae5..6ae14d5551b 100644 --- a/tests/ddl_sequence/run.sh +++ b/tests/ddl_sequence/run.sh @@ -39,4 +39,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/drop_many_tables/run.sh b/tests/drop_many_tables/run.sh index 7b0c5e3b10b..d67cb20df8b 100644 --- a/tests/drop_many_tables/run.sh +++ b/tests/drop_many_tables/run.sh @@ -39,4 +39,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/force_replicate_table/run.sh b/tests/force_replicate_table/run.sh index 05e46b53bd8..52db341c601 100755 --- a/tests/force_replicate_table/run.sh +++ b/tests/force_replicate_table/run.sh @@ -83,4 +83,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index cd6480c13e9..8006728c93f 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -126,4 +126,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/generate_column/run.sh b/tests/generate_column/run.sh index 683fb59458d..6d78ed7e0a2 100644 --- a/tests/generate_column/run.sh +++ b/tests/generate_column/run.sh @@ -40,4 +40,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh index a61ed24efdd..ce1378b080b 100755 --- a/tests/kafka_messages/run.sh +++ b/tests/kafka_messages/run.sh @@ -126,4 +126,5 @@ function run_batch_size_limit() { trap stop_tidb_cluster EXIT run_length_limit $* run_batch_size_limit $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/kafka_sink_error_resume/run.sh b/tests/kafka_sink_error_resume/run.sh index 4bf215802b1..8a48c4d26a3 100755 --- a/tests/kafka_sink_error_resume/run.sh +++ b/tests/kafka_sink_error_resume/run.sh @@ -71,4 +71,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/kill_owner_with_ddl/run.sh b/tests/kill_owner_with_ddl/run.sh index fac0922165c..7c927475517 100755 --- a/tests/kill_owner_with_ddl/run.sh +++ b/tests/kill_owner_with_ddl/run.sh @@ -54,7 +54,7 @@ function run() { run_sql "CREATE table kill_owner_with_ddl.t1 (id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} check_table_exists "kill_owner_with_ddl.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDDLDelay=return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDDLDelay=return(true);github.com/pingcap/ticdc/cdc/ownerFlushIntervalInject=return(0)' kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY for i in $(seq 2 3); do @@ -85,4 +85,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/many_pk_or_uk/run.sh b/tests/many_pk_or_uk/run.sh index 85f37027235..b82ffd1e1bb 100755 --- a/tests/many_pk_or_uk/run.sh +++ b/tests/many_pk_or_uk/run.sh @@ -41,4 +41,5 @@ GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester. check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/diff_config.toml cleanup_process $CDC_BINARY +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 2ee1d437a49..abeaa98ab78 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -57,4 +57,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/multi_capture/run.sh b/tests/multi_capture/run.sh index 9ce92bb9d1d..434e07fc654 100755 --- a/tests/multi_capture/run.sh +++ b/tests/multi_capture/run.sh @@ -61,4 +61,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/multi_source/run.sh b/tests/multi_source/run.sh index b92d867a48c..806eb212a1b 100755 --- a/tests/multi_source/run.sh +++ b/tests/multi_source/run.sh @@ -46,4 +46,5 @@ check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_sync_diff $WORK_DIR $CUR/diff_config.toml cleanup_process $CDC_BINARY +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/new_ci_collation/run.sh b/tests/new_ci_collation/run.sh index 5dbf7faeefc..9384f6ce021 100755 --- a/tests/new_ci_collation/run.sh +++ b/tests/new_ci_collation/run.sh @@ -44,4 +44,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/partition_table/run.sh b/tests/partition_table/run.sh index 62e34c01717..d5ca4a73a1f 100644 --- a/tests/partition_table/run.sh +++ b/tests/partition_table/run.sh @@ -40,4 +40,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/processor_panic/run.sh b/tests/processor_panic/run.sh index 9b4110f3952..9d7691f32cd 100644 --- a/tests/processor_panic/run.sh +++ b/tests/processor_panic/run.sh @@ -44,5 +44,5 @@ GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester. check_table_exists test.end_mark_table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/diff_config.toml cleanup_process $CDC_BINARY - +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/processor_resolved_ts_fallback/run.sh b/tests/processor_resolved_ts_fallback/run.sh index f7e25cae783..952f922c3b4 100755 --- a/tests/processor_resolved_ts_fallback/run.sh +++ b/tests/processor_resolved_ts_fallback/run.sh @@ -57,4 +57,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/processor_stop_delay/run.sh b/tests/processor_stop_delay/run.sh index dda56acb46b..9326d857ba4 100644 --- a/tests/processor_stop_delay/run.sh +++ b/tests/processor_stop_delay/run.sh @@ -51,4 +51,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/resolve_lock/run.sh b/tests/resolve_lock/run.sh index 57491f47e2e..6b328aa85f2 100755 --- a/tests/resolve_lock/run.sh +++ b/tests/resolve_lock/run.sh @@ -41,4 +41,5 @@ GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester. check_table_exists test.t2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/diff_config.toml cleanup_process $CDC_BINARY +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/row_format/run.sh b/tests/row_format/run.sh index 136953cd658..6de926fa812 100644 --- a/tests/row_format/run.sh +++ b/tests/row_format/run.sh @@ -55,4 +55,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/simple/run.sh b/tests/simple/run.sh index 7292c84bbc1..12b36031035 100644 --- a/tests/simple/run.sh +++ b/tests/simple/run.sh @@ -102,4 +102,5 @@ function sql_test() { trap stop_tidb_cluster EXIT prepare $* sql_test $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/sink_hang/run.sh b/tests/sink_hang/run.sh index 88a0208ddfb..e8c4da8d05e 100644 --- a/tests/sink_hang/run.sh +++ b/tests/sink_hang/run.sh @@ -67,4 +67,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/sink_retry/run.sh b/tests/sink_retry/run.sh index f09f565ef2d..b9e9b103dba 100755 --- a/tests/sink_retry/run.sh +++ b/tests/sink_retry/run.sh @@ -45,4 +45,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/split_region/run.sh b/tests/split_region/run.sh index 63eeeec07d6..b4f2d85e07a 100755 --- a/tests/split_region/run.sh +++ b/tests/split_region/run.sh @@ -48,4 +48,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/syncpoint/run.sh b/tests/syncpoint/run.sh index 28b5f7b181e..9ff3c83fc84 100755 --- a/tests/syncpoint/run.sh +++ b/tests/syncpoint/run.sh @@ -154,4 +154,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/tiflash/run.sh b/tests/tiflash/run.sh index 51b22135002..3eade89d0bb 100644 --- a/tests/tiflash/run.sh +++ b/tests/tiflash/run.sh @@ -39,4 +39,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/unified_sorter/run.sh b/tests/unified_sorter/run.sh index 1079755f5a7..2b2e99b857c 100755 --- a/tests/unified_sorter/run.sh +++ b/tests/unified_sorter/run.sh @@ -67,4 +67,5 @@ function run() { trap stop_tidb_cluster EXIT run $* +check_cdc_state_log $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"