diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 358bfee893c..fcc88329c0c 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -118,12 +118,18 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) state := rawState.(*orchestrator.GlobalReactorState) o.captures = state.Captures o.updateMetrics(state) + + // handleJobs() should be called before clusterVersionConsistent(), because + // when there are different versions of cdc nodes in the cluster, + // the admin job may not be processed all the time. And http api relies on + // admin job, which will cause all http api unavailable. + o.handleJobs() + if !o.clusterVersionConsistent(state.Captures) { // sleep one second to avoid printing too much log time.Sleep(1 * time.Second) return state, nil } - // Owner should update GC safepoint before initializing changefeed, so // changefeed can remove its "ticdc-creating" service GC safepoint during // initializing. @@ -133,7 +139,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) return nil, errors.Trace(err) } - o.handleJobs() for changefeedID, changefeedState := range state.Changefeeds { if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 25ad60ae026..c500eff2787 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/pingcap/check" @@ -79,6 +80,7 @@ func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) + changefeedID := "test-changefeed" changefeedInfo := &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), @@ -362,3 +364,78 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { case <-ch: } } + +// make sure handleJobs works well even if there is two different +// version of captures in the cluster +func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + statusProvider := owner.StatusProvider() + // work well + changefeedID := "test-changefeed" + changefeedInfo := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + + // add an non-consistent version capture + captureInfo := &model.CaptureInfo{ + ID: "capture-id-owner-test", + AdvertiseAddr: "127.0.0.1:0000", + Version: " v0.0.1-test-only", + } + cdcKey = etcd.CDCKey{ + Tp: etcd.CDCKeyTypeCapture, + CaptureID: captureInfo.ID, + } + v, err := captureInfo.Marshal() + c.Assert(err, check.IsNil) + tester.MustUpdate(cdcKey.String(), v) + + // try to add another changefeed + changefeedID1 := "test-changefeed1" + changefeedInfo1 := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + } + changefeedStr1, err := changefeedInfo1.Marshal() + c.Assert(err, check.IsNil) + cdcKey = etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr1)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + // make sure this changefeed add failed, which means that owner are return + // in clusterVersionConsistent check + c.Assert(owner.changefeeds[changefeedID1], check.IsNil) + + // make sure statusProvider works well + ctx1, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + infos, err := statusProvider.GetAllChangeFeedInfo(ctx1) + c.Assert(err, check.IsNil) + c.Assert(infos[changefeedID], check.NotNil) + c.Assert(infos[changefeedID1], check.IsNil) + wg.Done() + }() + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) +}