Skip to content

Commit

Permalink
api,config(ticdc): support update transaction-atomicity and protocol …
Browse files Browse the repository at this point in the history
…via config file (pingcap#7980)

close pingcap#7935
  • Loading branch information
CharlesCheung96 committed Jan 10, 2023
1 parent f69c799 commit cc19ca1
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 104 deletions.
20 changes: 15 additions & 5 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,32 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}
}

var sinkConfigUpdated, sinkURIUpdated bool
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
sinkConfigUpdated = true
newInfo.Config.Sink = changefeedConfig.SinkConfig
}

// verify sink_uri
if changefeedConfig.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = changefeedConfig.SinkURI
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config, newInfo.Opts); err != nil {
}

if sinkConfigUpdated || sinkURIUpdated {
// check sink config is compatible with sinkURI
newCfg := newInfo.Config.Sink
oldCfg := oldInfo.Config.Sink
err := newCfg.CheckCompatibilityWithSinkURI(oldCfg, newInfo.SinkURI)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config, newInfo.Opts); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
5 changes: 3 additions & 2 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -341,7 +342,7 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
return
}

if config.IsMqScheme(uri.Scheme) {
if sink.IsMQScheme(uri.Scheme) {
return
}

Expand All @@ -366,7 +367,7 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
return
}

if !config.IsMqScheme(uri.Scheme) {
if !sink.IsMQScheme(uri.Scheme) {
return
}

Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ error = '''
illegal parameter for sorter: %s
'''

["CDC:ErrIncompatibleSinkConfig"]
error = '''
incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri
'''

["CDC:ErrIndexKeyTableNotFound"]
error = '''
table not found with index ID %d in index kv
Expand Down Expand Up @@ -936,6 +941,11 @@ error = '''
sink uri invalid '%s'
'''

["CDC:ErrSinkUnknownProtocol"]
error = '''
unknown '%s' message protocol for sink
'''

["CDC:ErrSnapshotLostByGC"]
error = '''
fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than or equal to GC safepoint at %d
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/cli/cli_changefeed_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
"fmt"
"strings"

"github.com/pingcap/tiflow/pkg/etcd"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/factory"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/security"
"github.com/r3labs/diff"
"github.com/spf13/cobra"
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ const (
"worker-num": 3
},
"sink": {
"dispatchers": null,
"transaction-atomicity": "",
"protocol": "open-protocol",
"dispatchers": null,
"column-selectors": [
{
"matcher": [
Expand All @@ -170,7 +171,6 @@ const (
}
],
"schema-registry": "",
"transaction-atomicity": "",
"encoder-concurrency": 16
},
"cyclic-replication": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
{Matcher: []string{"a.d"}, DispatcherRule: "r2"},
}
conf.Sink.TxnAtomicity = unknowTxnAtomicity
conf.Sink.TxnAtomicity = unknownTxnAtomicity
require.Equal(t, conf, conf2)
}

Expand Down
195 changes: 133 additions & 62 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,62 @@ package config
import (
"fmt"
"net/url"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/zap"
)

// DefaultMaxMessageBytes sets the default value for max-message-bytes.
const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

// AtomicityLevel represents the atomicity level of a changefeed.
type AtomicityLevel string

const (
// unknowTxnAtomicity is the default atomicity level, which is invalid and will
// be set to a valid value when initializing sink in processor.
unknowTxnAtomicity AtomicityLevel = ""
// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
TxnAtomicityKey = "transaction-atomicity"
// defaultTxnAtomicity is the default atomicity level.
defaultTxnAtomicity = noneTxnAtomicity

// unknownTxnAtomicity is an invalid atomicity level and will be treated as
// defaultTxnAtomicity when initializing sink in processor.
unknownTxnAtomicity AtomicityLevel = ""
// noneTxnAtomicity means atomicity of transactions is not guaranteed
noneTxnAtomicity AtomicityLevel = "none"

// tableTxnAtomicity means atomicity of single table transactions is guaranteed.
tableTxnAtomicity AtomicityLevel = "table"

// globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which
// is currently not supported by TiCDC.
// globalTxnAtomicity AtomicityLevel = "global"

defaultMqTxnAtomicity AtomicityLevel = noneTxnAtomicity
defaultMysqlTxnAtomicity AtomicityLevel = noneTxnAtomicity
)

// AtomicityLevel represents the atomicity level of a changefeed.
type AtomicityLevel string

// ShouldSplitTxn returns whether the sink should split txn.
func (l AtomicityLevel) ShouldSplitTxn() bool {
if l == unknownTxnAtomicity {
l = defaultTxnAtomicity
}
return l == noneTxnAtomicity
}

func (l AtomicityLevel) validate(scheme string) error {
switch l {
case unknownTxnAtomicity:
case noneTxnAtomicity:
// Do nothing here to avoid modifying the persistence parameters.
case tableTxnAtomicity:
// MqSink only support `noneTxnAtomicity`.
if sink.IsMQScheme(scheme) {
errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", l, scheme)
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg)
}
default:
errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", l, scheme)
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg)
}
return nil
}

// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var ForceEnableOldValueProtocols = []string{
ProtocolCanal.String(),
Expand All @@ -62,11 +81,12 @@ var ForceEnableOldValueProtocols = []string{

// SinkConfig represents sink config for a changefeed
type SinkConfig struct {
TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"`
Protocol string `toml:"protocol" json:"protocol"`

DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"`
Protocol string `toml:"protocol" json:"protocol"`
ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"`
SchemaRegistry string `toml:"schema-registry" json:"schema-registry"`
TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"`
EncoderConcurrency int `toml:"encoder-concurrency" json:"encoder-concurrency"`
}

Expand All @@ -88,7 +108,7 @@ type ColumnSelector struct {
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error {
if err := s.applyParameter(sinkURI); err != nil {
if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
return err
}

Expand Down Expand Up @@ -126,61 +146,37 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) er
return nil
}

// applyParameter fill the `ReplicaConfig` and `TxnAtomicity` by sinkURI.
func (s *SinkConfig) applyParameter(sinkURI *url.URL) error {
// validateAndAdjustSinkURI validate and adjust `Protocol` and `TxnAtomicity` by sinkURI.
func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error {
if sinkURI == nil {
return nil
}
params := sinkURI.Query()

txnAtomicity := params.Get("transaction-atomicity")
switch AtomicityLevel(txnAtomicity) {
case unknowTxnAtomicity:
// Set default value according to scheme.
if IsMqScheme(sinkURI.Scheme) {
s.TxnAtomicity = defaultMqTxnAtomicity
} else {
s.TxnAtomicity = defaultMysqlTxnAtomicity
}
case noneTxnAtomicity:
s.TxnAtomicity = noneTxnAtomicity
case tableTxnAtomicity:
// MqSink only support `noneTxnAtomicity`.
if IsMqScheme(sinkURI.Scheme) {
log.Warn("The configuration of transaction-atomicity is incompatible with scheme",
zap.Any("txnAtomicity", s.TxnAtomicity),
zap.String("scheme", sinkURI.Scheme),
zap.String("protocol", s.Protocol))
s.TxnAtomicity = defaultMqTxnAtomicity
} else {
s.TxnAtomicity = tableTxnAtomicity
if err := s.applyParameterBySinkURI(sinkURI); err != nil {
if !cerror.ErrIncompatibleSinkConfig.Equal(err) {
return err
}
default:
errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme",
txnAtomicity, sinkURI.Scheme)
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg)
// Ignore `ErrIncompatibleSinkConfig` here to:
// 1. Keep compatibility with old version.
// 2. Avoid throwing error when create changefeed.
log.Warn("sink-uri is not compatible with the sink config, "+
"the configuration in sink URI will be used", zap.Error(err))
}

protocolFromURI := params.Get(ProtocolKey)
if protocolFromURI != "" {
if s.Protocol != "" {
log.Warn(
fmt.Sprintf("protocol is specified in both sink URI and config file"+
"the value in sink URI will be used"+
"protocol in sink URI:%s, protocol in config file:%s",
protocolFromURI, s.Protocol))
}
s.Protocol = protocolFromURI
// validate that TxnAtomicity is valid and compatible with the scheme.
if err := s.TxnAtomicity.validate(sinkURI.Scheme); err != nil {
return err
}

// validate that protocol is compatible with the scheme
if IsMqScheme(sinkURI.Scheme) {
// Validate that protocol is compatible with the scheme. For testing purposes,
// any protocol should be legal for blackhole.
if sink.IsMQScheme(sinkURI.Scheme) {
var protocol Protocol
err := protocol.FromString(s.Protocol)
if err != nil {
return err
}
} else if s.Protocol != "" {
} else if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != "" {
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol %s "+
"is incompatible with %s scheme", s.Protocol, sinkURI.Scheme))
}
Expand All @@ -191,8 +187,83 @@ func (s *SinkConfig) applyParameter(sinkURI *url.URL) error {
return nil
}

// IsMqScheme returns true if the scheme belong to mq schema.
func IsMqScheme(scheme string) bool {
return scheme == "kafka" || scheme == "kafka+ssl" ||
scheme == "pulsar" || scheme == "pulsar+ssl"
// applyParameterBySinkURI parse sinkURI and set `Protocol` and `TxnAtomicity` to `SinkConfig`.
// Return:
// - ErrIncompatibleSinkConfig to terminate `updated` changefeed operation.
func (s *SinkConfig) applyParameterBySinkURI(sinkURI *url.URL) error {
if sinkURI == nil {
return nil
}

cfgInSinkURI := map[string]string{}
cfgInFile := map[string]string{}
params := sinkURI.Query()

txnAtomicityFromURI := AtomicityLevel(params.Get(TxnAtomicityKey))
if txnAtomicityFromURI != unknownTxnAtomicity {
if s.TxnAtomicity != unknownTxnAtomicity && s.TxnAtomicity != txnAtomicityFromURI {
cfgInSinkURI[TxnAtomicityKey] = string(txnAtomicityFromURI)
cfgInFile[TxnAtomicityKey] = string(s.TxnAtomicity)
}
s.TxnAtomicity = txnAtomicityFromURI
}

protocolFromURI := params.Get(ProtocolKey)
if protocolFromURI != "" {
if s.Protocol != "" && s.Protocol != protocolFromURI {
cfgInSinkURI[ProtocolKey] = protocolFromURI
cfgInFile[ProtocolKey] = s.Protocol
}
s.Protocol = protocolFromURI
}

getError := func() error {
if len(cfgInSinkURI) != len(cfgInFile) {
log.Panic("inconsistent configuration items in sink uri and configuration file",
zap.Any("cfgInSinkURI", cfgInSinkURI), zap.Any("cfgInFile", cfgInFile))
}
if len(cfgInSinkURI) == 0 && len(cfgInFile) == 0 {
return nil
}
getErrMsg := func(cfgIn map[string]string) string {
var errMsg strings.Builder
for k, v := range cfgIn {
errMsg.WriteString(fmt.Sprintf("%s=%s, ", k, v))
}
return errMsg.String()[0 : errMsg.Len()-2]
}
return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs(
getErrMsg(cfgInSinkURI), getErrMsg(cfgInFile))
}
return getError()
}

// CheckCompatibilityWithSinkURI check whether the sinkURI is compatible with the sink config.
func (s *SinkConfig) CheckCompatibilityWithSinkURI(
oldSinkConfig *SinkConfig, sinkURIStr string,
) error {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol ||
s.TxnAtomicity != oldSinkConfig.TxnAtomicity

isURIParamsChanged := func(oldCfg SinkConfig) bool {
err := oldCfg.applyParameterBySinkURI(sinkURI)
return cerror.ErrIncompatibleSinkConfig.Equal(err)
}
uriParamsChanged := isURIParamsChanged(*oldSinkConfig)

if !uriParamsChanged && !cfgParamsChanged {
return nil
}

compatibilityError := s.applyParameterBySinkURI(sinkURI)
if uriParamsChanged && cerror.ErrIncompatibleSinkConfig.Equal(compatibilityError) {
// Ignore compatibility error if the sinkURI make such changes.
return nil
}
return compatibilityError
}
Loading

0 comments on commit cc19ca1

Please sign in to comment.