Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: implement a basic owner, it can calculation resolvedTS #60

Merged
merged 11 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 255 additions & 14 deletions cdc/roles/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ package roles

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"
)

type ProcessTableInfo struct {
Expand All @@ -24,25 +31,259 @@ type ProcessTableInfo struct {

// Owner is used to process etcd information for a capture with owner role
type Owner interface {
// GetResolvedTS gets resolvedTS of a ChangeFeed
GetResolvedTS(changeFeedID string) (uint64, error)

// ResolverFunc registers the resolver into Owner
ResolverFunc(ctx context.Context, resolver func(ctx context.Context, changeFeedID string, captureID string) (uint64, error)) error
// GetCheckpointTS gets CheckpointTS of a ChangeFeed
GetCheckpointTS(changeFeedID string) (uint64, error)

// CheckpointerFunc registers the checkpointer into Owner
CheckpointerFunc(ctx context.Context, checkpointer func(ctx context.Context, changeFeedID string, captureID string) (uint64, error)) error
// Run a goroutine to handle Owner logic
Run(context.Context, time.Duration) error
}

// UpdateResolvedTSFunc registers a updater into Owner, which can update resolvedTS to ETCD
UpdateResolvedTSFunc(ctx context.Context, updater func(ctx context.Context, changeFeedID string, resolvedTS uint64) error)
type CaptureID = string
type ChangeFeedID = string
type ProcessorsInfos = map[CaptureID]*SubChangeFeedInfo

// UpdateCheckpointTSFunc registers a updater into Owner, which can update checkpointTS to ETCD
UpdateCheckpointTSFunc(ctx context.Context, updater func(ctx context.Context, changeFeedID string, checkpointTS uint64) error)
type ChangeFeedStatus int

// GetResolvedTS gets ResolvedTS of a ChangeFeed
GetResolvedTS(ctx context.Context, changeFeedID string) (uint64, error)
const (
ChangeFeedUnknown ChangeFeedStatus = iota
ChangeFeedSyncDML
ChangeFeedWaitToExecDDL
ChangeFeedExecDDL
ChangeFeedDDLExecuteFailed
)

// GetCheckpointTS gets CheckpointTS of a ChangeFeed
GetCheckpointTS(ctx context.Context, changeFeedID string) (uint64, error)
func (s ChangeFeedStatus) String() string {
switch s {
case ChangeFeedUnknown:
return "Unknown"
case ChangeFeedSyncDML:
return "SyncDML"
case ChangeFeedWaitToExecDDL:
return "WaitToExecDDL"
case ChangeFeedExecDDL:
return "ExecDDL"
case ChangeFeedDDLExecuteFailed:
return "DDLExecuteFailed"
}
return ""
}

// Run a goroutine to handle Owner logic
Run(ctx context.Context) error
type ChangeFeedInfo struct {
status ChangeFeedStatus
resolvedTS uint64
checkpointTS uint64

processorInfos ProcessorsInfos
ddlCurrentIndex int
}

func (c *ChangeFeedInfo) Status() ChangeFeedStatus {
return c.status
}

func (c *ChangeFeedInfo) ResolvedTS() uint64 {
return c.resolvedTS
}

func (c *ChangeFeedInfo) CheckpointTS() uint64 {
return c.checkpointTS
}

type ddlExecResult struct {
changeFeedID ChangeFeedID
job *model.Job
err error
}

// TODO edit sub change feed
type ownerImpl struct {
changeFeedInfos map[ChangeFeedID]*ChangeFeedInfo

ddlPullFunc func() (uint64, []*model.Job, error)
zier-one marked this conversation as resolved.
Show resolved Hide resolved
execDDLFunc func(*model.Job) error
readChangeFeedInfos func(context.Context) (map[ChangeFeedID]ProcessorsInfos, error)
writeChangeFeedInfos func(context.Context, map[ChangeFeedID]*ChangeFeedInfo) error
zier-one marked this conversation as resolved.
Show resolved Hide resolved

ddlResolvedTS uint64
targetTS uint64
ddlJobHistory []*model.Job
finishedDDLJob chan ddlExecResult

l sync.RWMutex
}

func (o *ownerImpl) loadChangeFeedInfos(ctx context.Context) error {
infos, err := o.readChangeFeedInfos(ctx)
if err != nil {
return errors.Trace(err)
}
// TODO: handle changefeed changed and the table of sub changefeed changed
// TODO: find the first index of one changefeed in ddl jobs
for changeFeedId, etcdChangeFeedInfo := range infos {
if cfInfo, exist := o.changeFeedInfos[changeFeedId]; exist {
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
cfInfo.processorInfos = etcdChangeFeedInfo
}
}
return nil
}

func (o *ownerImpl) flushChangeFeedInfos(ctx context.Context) error {
return errors.Trace(o.writeChangeFeedInfos(ctx, o.changeFeedInfos))
}

func (o *ownerImpl) pullDDLJob() error {
ddlResolvedTS, ddlJobs, err := o.ddlPullFunc()
if err != nil {
return errors.Trace(err)
}
o.ddlResolvedTS = ddlResolvedTS
o.ddlJobHistory = append(o.ddlJobHistory, ddlJobs...)
return nil
}

func (o *ownerImpl) getChangeFeedInfo(changeFeedID string) (*ChangeFeedInfo, error) {
info, exist := o.changeFeedInfos[changeFeedID]
if !exist {
return nil, errors.NotFoundf("ChangeFeed(%s) in ChangeFeedInfos", changeFeedID)
}
return info, nil
}

func (o *ownerImpl) GetResolvedTS(changeFeedID string) (uint64, error) {
o.l.RLock()
defer o.l.RUnlock()
cfInfo, err := o.getChangeFeedInfo(changeFeedID)
if err != nil {
return 0, err
}
return cfInfo.resolvedTS, nil
}

func (o *ownerImpl) GetCheckpointTS(changeFeedID string) (uint64, error) {
o.l.RLock()
defer o.l.RUnlock()
cfInfo, err := o.getChangeFeedInfo(changeFeedID)
if err != nil {
return 0, err
}
return cfInfo.checkpointTS, nil
}

func (o *ownerImpl) calcResolvedTS() error {
for _, cfInfo := range o.changeFeedInfos {
if cfInfo.status != ChangeFeedSyncDML {
continue
}
minResolvedTS := o.targetTS
for _, pStatus := range cfInfo.processorInfos {
if minResolvedTS > pStatus.ResolvedTS {
minResolvedTS = pStatus.ResolvedTS
}
}
if minResolvedTS > o.ddlResolvedTS {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
if err := o.pullDDLJob(); err != nil {
return errors.Trace(err)
}
if minResolvedTS > o.ddlResolvedTS {
minResolvedTS = o.ddlResolvedTS
}
}
if len(o.ddlJobHistory) > cfInfo.ddlCurrentIndex &&
minResolvedTS > o.ddlJobHistory[cfInfo.ddlCurrentIndex].BinlogInfo.FinishedTS {
minResolvedTS = o.ddlJobHistory[cfInfo.ddlCurrentIndex].BinlogInfo.FinishedTS
cfInfo.status = ChangeFeedWaitToExecDDL
}
cfInfo.resolvedTS = minResolvedTS
}
return nil
}

func (o *ownerImpl) handleDDL(ctx context.Context) error {
FOR1:
for changeFeedID, cfInfo := range o.changeFeedInfos {
if cfInfo.status != ChangeFeedWaitToExecDDL {
break
zier-one marked this conversation as resolved.
Show resolved Hide resolved
}
todoDDLJob := o.ddlJobHistory[cfInfo.ddlCurrentIndex]
for _, pInfo := range cfInfo.processorInfos {
if pInfo.CheckPointTS != todoDDLJob.BinlogInfo.FinishedTS {
continue FOR1
}
}
cfInfo.status = ChangeFeedExecDDL
go func() {
err := o.execDDLFunc(todoDDLJob)
o.finishedDDLJob <- ddlExecResult{changeFeedID, todoDDLJob, err}
}()
}
FOR2:
for {
select {
case ddlExecRes, ok := <-o.finishedDDLJob:
if !ok {
break FOR2
}
cfInfo, exist := o.changeFeedInfos[ddlExecRes.changeFeedID]
if !exist {
return errors.NotFoundf("the changeFeedStatus of ChangeFeed(%s)", ddlExecRes.changeFeedID)
}
if ddlExecRes.err != nil {
cfInfo.status = ChangeFeedDDLExecuteFailed
log.Error("Execute DDL failed",
zap.String("ChangeFeedID", ddlExecRes.changeFeedID),
zap.Error(ddlExecRes.err),
zap.Reflect("ddlJob", ddlExecRes.job))
continue
}
if cfInfo.status != ChangeFeedExecDDL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this be set to something unexpected? It seems that it's just set before the goroutine starts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure, If cfInfo.status is changed for some reason(such as concurrency problem), the correctness of data will be broke, we need a obviously error.

log.Fatal("changeFeedState must be ChangeFeedExecDDL when DDL is executed",
zap.String("ChangeFeedID", ddlExecRes.changeFeedID),
zap.String("ChangeFeedState", cfInfo.status.String()))
}
cfInfo.ddlCurrentIndex += 1
cfInfo.status = ChangeFeedSyncDML
default:
suzaku marked this conversation as resolved.
Show resolved Hide resolved
break FOR2
}
}
return nil
}

func (o *ownerImpl) Run(ctx context.Context, tickTime time.Duration) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(tickTime):
err := o.run(ctx)
if err != nil {
return err
}
}
}
}

func (o *ownerImpl) run(ctx context.Context) error {
//o.l.Lock()
//defer o.l.Unlock()
err := o.loadChangeFeedInfos(ctx)
if err != nil {
return errors.Trace(err)
}
err = o.calcResolvedTS()
if err != nil {
return errors.Trace(err)
}
err = o.handleDDL(ctx)
if err != nil {
return errors.Trace(err)
}
err = o.flushChangeFeedInfos(ctx)
if err != nil {
return errors.Trace(err)
}
return nil
}
Loading