Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api,config(ticdc): support update transaction-atomicity and protocol via config file #7980

Merged
merged 2 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,32 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
}
}

var sinkConfigUpdated, sinkURIUpdated bool
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
sinkConfigUpdated = true
newInfo.Config.Sink = changefeedConfig.SinkConfig
}

// verify sink_uri
if changefeedConfig.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = changefeedConfig.SinkURI
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config); err != nil {
}

if sinkConfigUpdated || sinkURIUpdated {
// check sink config is compatible with sinkURI
newCfg := newInfo.Config.Sink
oldCfg := oldInfo.Config.Sink
err := newCfg.CheckCompatibilityWithSinkURI(oldCfg, newInfo.SinkURI)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
42 changes: 26 additions & 16 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,13 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
kvStorage tidbkv.Storage,
checkpointTs uint64,
) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) {
// update changefeed info
newInfo, err := oldInfo.Clone()
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}

// verify TargetTs
var configUpdated, sinkURIUpdated bool
if cfg.TargetTs != 0 {
if cfg.TargetTs <= newInfo.StartTs {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStack(
Expand All @@ -308,43 +309,52 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
}
newInfo.TargetTs = cfg.TargetTs
}

// verify replica config
if cfg.Engine != "" {
newInfo.Engine = cfg.Engine
}
if cfg.ReplicaConfig != nil {
configUpdated = true
newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig()
err = newInfo.Config.ValidateAndAdjust(nil)
if err != nil {
return nil, nil, err
}
}
if cfg.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = cfg.SinkURI
}

// verify changefeed info
f, err := filter.NewFilter(newInfo.Config, "")
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error())
}

tableInfos, _, _, err := entry.VerifyTables(f, kvStorage, checkpointTs)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

err = f.Verify(tableInfos)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error())
}

// verify SinkURI
if cfg.SinkURI != "" {
newInfo.SinkURI = cfg.SinkURI
if configUpdated || sinkURIUpdated {
log.Info("config or sink uri updated, check the compatibility",
zap.Bool("configUpdated", configUpdated),
zap.Bool("sinkURIUpdated", sinkURIUpdated))
// check sink config is compatible with sinkURI
newCfg := newInfo.Config.Sink
oldCfg := oldInfo.Config.Sink
err := newCfg.CheckCompatibilityWithSinkURI(oldCfg, newInfo.SinkURI)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
if cfg.Engine != "" {
newInfo.Engine = cfg.Engine
}

// update and verify up info
newUpInfo, err := oldUpInfo.Clone()
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
Expand All @@ -364,9 +374,9 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
if cfg.CertAllowedCN != nil {
newUpInfo.CertAllowedCN = cfg.CertAllowedCN
}

changefeedInfoChanged := diff.Changed(oldInfo, newInfo)
upstreamInfoChanged := diff.Changed(oldUpInfo, newUpInfo)

if !changefeedInfoChanged && !upstreamInfoChanged {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs("changefeed config is the same with the old one, do nothing")
Expand Down
1 change: 0 additions & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Nil(t, err)
// startTs can not be updated
require.Equal(t, "none", string(newCfInfo.Config.Sink.TxnAtomicity))
newCfInfo.Config.Sink.TxnAtomicity = ""
require.Equal(t, uint64(0), newCfInfo.StartTs)
require.Equal(t, uint64(10), newCfInfo.TargetTs)
Expand Down
35 changes: 17 additions & 18 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

cfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
oldCfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
if cfInfo.State != model.StateStopped {
if oldCfInfo.State != model.StateStopped {
_ = c.Error(cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs("can only update changefeed config when it is stopped"))
return
Expand All @@ -232,40 +232,39 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}
cfInfo.Namespace = changefeedID.Namespace
cfInfo.ID = changefeedID.ID
upInfo, err := etcdClient.GetUpstreamInfo(ctx, cfInfo.UpstreamID,
cfInfo.Namespace)
oldCfInfo.Namespace = changefeedID.Namespace
oldCfInfo.ID = changefeedID.ID
OldUpInfo, err := etcdClient.GetUpstreamInfo(ctx, oldCfInfo.UpstreamID,
oldCfInfo.Namespace)
if err != nil {
_ = c.Error(err)
return
}

updateCfConfig := &ChangefeedConfig{}
updateCfConfig.ReplicaConfig = ToAPIReplicaConfig(cfInfo.Config)
if err = c.BindJSON(updateCfConfig); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

if err = h.helpers.verifyUpstream(ctx, updateCfConfig, cfInfo); err != nil {
if err = h.helpers.verifyUpstream(ctx, updateCfConfig, oldCfInfo); err != nil {
_ = c.Error(errors.Trace(err))
return
}

log.Info("Old ChangeFeed and Upstream Info",
zap.String("changefeedInfo", cfInfo.String()),
zap.Any("upstreamInfo", upInfo))
zap.String("changefeedInfo", oldCfInfo.String()),
zap.Any("upstreamInfo", OldUpInfo))

var pdAddrs []string
var credentials *security.Credential
if upInfo != nil {
pdAddrs = strings.Split(upInfo.PDEndpoints, ",")
if OldUpInfo != nil {
pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",")
credentials = &security.Credential{
CAPath: upInfo.CAPath,
CertPath: upInfo.CertPath,
KeyPath: upInfo.KeyPath,
CertAllowedCN: upInfo.CertAllowedCN,
CAPath: OldUpInfo.CAPath,
CertPath: OldUpInfo.CertPath,
KeyPath: OldUpInfo.KeyPath,
CertAllowedCN: OldUpInfo.CertAllowedCN,
}
}
if len(updateCfConfig.PDAddrs) != 0 {
Expand All @@ -277,8 +276,8 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
if err != nil {
_ = c.Error(errors.Trace(err))
}
newCfInfo, newUpInfo, err := h.helpers.
verifyUpdateChangefeedConfig(ctx, updateCfConfig, cfInfo, upInfo, storage, cfStatus.CheckpointTs)
newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx,
updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ error = '''
illegal parameter for sorter: %s
'''

["CDC:ErrIncompatibleSinkConfig"]
error = '''
incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri
'''

["CDC:ErrIndexKeyTableNotFound"]
error = '''
table not found with index ID %d in index kv
Expand Down
Loading