Skip to content

Commit

Permalink
Merge branch 'master' into fix-resolve-lock-false-positive
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 12, 2021
2 parents 4f3eff7 + 526d5de commit 838fd10
Show file tree
Hide file tree
Showing 12 changed files with 1,117 additions and 616 deletions.
154 changes: 45 additions & 109 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/owner"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/logutil"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -76,62 +75,29 @@ func (s *Server) handleResignOwner(w http.ResponseWriter, req *http.Request) {
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if config.NewReplicaImpl {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
err := s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.AsyncStop()
return nil
})
handleOwnerResp(w, err)
return
}
s.ownerLock.RLock()
if s.owner == nil {

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
s.ownerLock.RUnlock()
return
}
// Resign is a complex process that needs to be synchronized because
// it happens in two separate goroutines
//
// Imagine that we have goroutines A and B
// A1. Notify the owner to exit
// B1. The owner exits gracefully
// A2. Delete the leader key until the owner has exited
// B2. Restart to campaign
//
// A2 must occur between B1 and B2, so we register the Resign process
// as the stepDown function which is called when the owner exited.
s.owner.Close(req.Context(), func(ctx context.Context) error {
return s.capture.Resign(ctx)
err := s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.AsyncStop()
return nil
})
s.ownerLock.RUnlock()
s.setOwner(nil)
handleOwnerResp(w, nil)
handleOwnerResp(w, err)
}

func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand Down Expand Up @@ -160,14 +126,12 @@ func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request)
Type: model.AdminJobType(typ),
Opts: opts,
}
if config.NewReplicaImpl {
err = s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})
} else {
err = s.owner.EnqueueJob(job)
}

err = s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})

handleOwnerResp(w, err)
}

Expand All @@ -176,19 +140,11 @@ func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand All @@ -202,14 +158,12 @@ func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
}
if config.NewReplicaImpl {
err = s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.TriggerRebalance(changefeedID)
return nil
})
} else {
s.owner.TriggerRebalance(changefeedID)
}

err = s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.TriggerRebalance(changefeedID)
return nil
})

handleOwnerResp(w, err)
}

Expand All @@ -218,19 +172,11 @@ func (s *Server) handleMoveTable(w http.ResponseWriter, req *http.Request) {
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand All @@ -257,14 +203,12 @@ func (s *Server) handleMoveTable(w http.ResponseWriter, req *http.Request) {
cerror.ErrAPIInvalidParam.GenWithStack("invalid tableID: %s", tableIDStr))
return
}
if config.NewReplicaImpl {
err = s.captureV2.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.ManualSchedule(changefeedID, to, tableID)
return nil
})
} else {
s.owner.ManualSchedule(changefeedID, to, tableID)
}

err = s.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.ManualSchedule(changefeedID, to, tableID)
return nil
})

handleOwnerResp(w, err)
}

Expand All @@ -273,19 +217,11 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
writeError(w, http.StatusBadRequest, cerror.ErrSupportPostOnly.GenWithStackByArgs())
return
}
if !config.NewReplicaImpl {
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner == nil {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

if s.capture == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}

err := req.ParseForm()
Expand Down
41 changes: 5 additions & 36 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,31 +113,9 @@ func (s *Server) writeEtcdInfo(ctx context.Context, cli *kv.CDCEtcdClient, w io.
}

func (s *Server) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
if config.NewReplicaImpl {
s.captureV2.WriteDebugInfo(w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
s.writeEtcdInfo(req.Context(), s.etcdClient, w)
return
}
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
if s.owner != nil {
fmt.Fprintf(w, "\n\n*** owner info ***:\n\n")
s.owner.writeDebugInfo(w)
}

fmt.Fprintf(w, "\n\n*** processors info ***:\n\n")
if config.NewReplicaImpl {
s.capture.processorManager.WriteDebugInfo(w)
} else {
for _, p := range s.capture.processors {
p.writeDebugInfo(w)
fmt.Fprintf(w, "\n")
}
}

s.capture.WriteDebugInfo(w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
s.writeEtcdInfo(req.Context(), &s.capture.etcdClient, w)
s.writeEtcdInfo(req.Context(), s.etcdClient, w)
}

func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
Expand All @@ -146,20 +124,11 @@ func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
GitHash: version.GitHash,
Pid: os.Getpid(),
}
if config.NewReplicaImpl {
if s.captureV2 != nil {
st.ID = s.captureV2.Info().ID
st.IsOwner = s.captureV2.IsOwner()
}
writeData(w, st)
return
}
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()

if s.capture != nil {
st.ID = s.capture.info.ID
st.ID = s.capture.Info().ID
st.IsOwner = s.capture.IsOwner()
}
st.IsOwner = s.owner != nil
writeData(w, st)
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
ctx := context.Background()
detail := &model.ChangeFeedInfo{
SinkURI: "root@tcp(127.0.0.1:3306)/mysql",
SortDir: "/old-version/sorter",
}
cfID := "test-op-cf"

Expand All @@ -236,6 +237,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
d, err := s.client.GetChangeFeedInfo(ctx, cfID)
c.Assert(err, check.IsNil)
c.Assert(d.SinkURI, check.Equals, detail.SinkURI)
c.Assert(d.SortDir, check.Equals, detail.SortDir)

err = s.client.LeaseGuardDeleteChangeFeedInfo(ctx, cfID, sess.Lease())
c.Assert(err, check.IsNil)
Expand Down
4 changes: 3 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ type ChangeFeedInfo struct {
AdminJobType AdminJobType `json:"admin-job-type"`
Engine SortEngine `json:"sort-engine"`
// SortDir is deprecated
SortDir string `json:"-"`
// it cannot be set by user in changefeed level, any assignment to it should be ignored.
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
},
StartTs: 417136892416622595,
Engine: "memory",
SortDir: ".",
Config: &config.ReplicaConfig{
CaseSensitive: true,
Filter: &config.FilterConfig{
Expand Down
8 changes: 2 additions & 6 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ LOOP:
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})

failpoint.Inject("NewChangefeedRetryError", func() {
failpoint.Return(errors.New("failpoint injected retriable error"))
})

if c.state.Info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, checkpointTs)
if err != nil {
Expand Down Expand Up @@ -262,10 +260,8 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI
if status == nil {
status = &model.ChangeFeedStatus{
// the changefeed status is nil when the changefeed is just created.
// the txn in start ts is not replicated at that time,
// so the checkpoint ts and resolved ts should less than start ts.
ResolvedTs: c.state.Info.StartTs - 1,
CheckpointTs: c.state.Info.StartTs - 1,
ResolvedTs: c.state.Info.StartTs,
CheckpointTs: c.state.Info.StartTs,
AdminJobType: model.AdminNone,
}
return status, true, nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *changefeedSuite) TestInitialize(c *check.C) {
// initialize
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1)
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs)
}

func (s *changefeedSuite) TestHandleError(c *check.C) {
Expand All @@ -186,7 +186,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) {
// handle error
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1)
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs)
c.Assert(state.Info.Error.Message, check.Equals, "fake error")
}

Expand Down
8 changes: 4 additions & 4 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"go.uber.org/zap"
)

// feedStateManager manages the feedState of a changefeed
// when the error, admin job happened, the feedStateManager is responsible for controlling the feedState
// feedStateManager manages the ReactorState of a changefeed
// when a error or a admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
state *model.ChangefeedReactorState
shouldBeRunning bool
Expand Down Expand Up @@ -112,15 +112,15 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = false
jobsPending = true
m.patchState(model.StateRemoved)
// remove changefeed info and state
// remove changefeed info and status
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, true, nil
})
m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, true, nil
})
checkpointTs := m.state.Info.GetCheckpointTs(m.state.Status)
log.Info("the changefeed removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs))
log.Info("the changefeed is removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs))

case model.AdminResume:
switch m.state.Info.State {
Expand Down
3 changes: 3 additions & 0 deletions cdc/owner/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (m *gcManager) updateGCSafePoint(ctx cdcContext.Context, state *model.Globa
failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) {
actual = uint64(val.(int))
})
if actual == minCheckpointTs {
log.Info("update gc safe point success", zap.Uint64("gcSafePointTs", minCheckpointTs))
}
if actual > minCheckpointTs {
log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs", zap.Uint64("actual", actual), zap.Uint64("checkpointTs", minCheckpointTs))
}
Expand Down
Loading

0 comments on commit 838fd10

Please sign in to comment.