From 279cb04f16618cba18f02df7626b449040db8b70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Wed, 8 Dec 2021 13:35:56 +0800 Subject: [PATCH] This is an automated cherry-pick of #3747 Signed-off-by: ti-chi-bot --- cdc/sink/codec/avro.go | 3 +- cdc/sink/codec/canal.go | 5 +- cdc/sink/codec/canal_flat.go | 20 +++- cdc/sink/codec/craft.go | 7 +- cdc/sink/codec/interface.go | 58 +++-------- cdc/sink/codec/interface_test.go | 13 +-- cdc/sink/codec/json.go | 12 +-- cdc/sink/codec/maxwell.go | 5 +- cdc/sink/mq.go | 44 ++++++-- pkg/cmd/cli/cli_changefeed_create.go | 13 ++- pkg/config/mq_sink_protocol.go | 75 ++++++++++++++ pkg/config/mq_sink_protocol_test.go | 100 ++++++++++++++++++ pkg/config/replica_config.go | 146 +++++++++++++++++++++++++++ pkg/config/replica_config_test.go | 95 +++++++++++++++++ pkg/config/sink.go | 31 ++++++ pkg/config/sink_test.go | 81 +++++++++++++++ 16 files changed, 627 insertions(+), 81 deletions(-) create mode 100644 pkg/config/mq_sink_protocol.go create mode 100644 pkg/config/mq_sink_protocol_test.go create mode 100644 pkg/config/replica_config.go create mode 100644 pkg/config/replica_config_test.go create mode 100644 pkg/config/sink_test.go diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index c64bc828921..f99a89ea6ea 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" @@ -88,7 +89,7 @@ func (a *AvroEventBatchEncoder) SetTimeZone(tz *time.Location) { // AppendRowChangedEvent appends a row change event to the encoder // NOTE: the encoder can only store one RowChangedEvent! func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) { - mqMessage := NewMQMessage(ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table) + mqMessage := NewMQMessage(config.ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table) if !e.IsDelete() { res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz) diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index cde3bba4f79..873bd3eb596 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -23,6 +23,7 @@ import ( "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" canal "github.com/pingcap/ticdc/proto/canal" mm "github.com/pingcap/tidb/parser/model" @@ -373,7 +374,7 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - return newDDLMQMessage(ProtocolCanal, nil, b, e), nil + return newDDLMQMessage(config.ProtocolCanal, nil, b, e), nil } // Build implements the EventBatchEncoder interface @@ -391,7 +392,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { if err != nil { log.Panic("Error when serializing Canal packet", zap.Error(err)) } - ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) + ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) d.messages.Reset() d.resetPacket() return []*MQMessage{ret} diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 5929488db07..1e455c124cc 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerrors "github.com/pingcap/ticdc/pkg/errors" canal "github.com/pingcap/ticdc/proto/canal" "go.uber.org/zap" @@ -184,7 +185,20 @@ func (c *CanalFlatEventBatchEncoder) newFlatMessageForDDL(e *model.DDLEvent) *ca // EncodeCheckpointEvent is no-op func (c *CanalFlatEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { +<<<<<<< HEAD return nil, nil +======= + if !c.enableTiDBExtension { + return nil, nil + } + + msg := c.newFlatMessage4CheckpointEvent(ts) + value, err := json.Marshal(msg) + if err != nil { + return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err) + } + return newResolvedMQMessage(config.ProtocolCanalJSON, nil, value, ts), nil +>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747)) } // AppendRowChangedEvent implements the interface EventBatchEncoder @@ -222,7 +236,7 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa if err != nil { return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err) } - return newDDLMQMessage(ProtocolCanalJSON, nil, value, e), nil + return newDDLMQMessage(config.ProtocolCanalJSON, nil, value, e), nil } // Build implements the EventBatchEncoder interface @@ -237,7 +251,11 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } +<<<<<<< HEAD ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table) +======= + ret[i] = NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable()) +>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747)) } c.resolvedBuf = c.resolvedBuf[0:0] return ret diff --git a/cdc/sink/codec/craft.go b/cdc/sink/codec/craft.go index 2766f12950b..99599b89edb 100644 --- a/cdc/sink/codec/craft.go +++ b/cdc/sink/codec/craft.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/sink/codec/craft" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" ) @@ -38,7 +39,7 @@ type CraftEventBatchEncoder struct { // EncodeCheckpointEvent implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { - return newResolvedMQMessage(ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil + return newResolvedMQMessage(config.ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil } func (e *CraftEventBatchEncoder) flush() { @@ -46,7 +47,7 @@ func (e *CraftEventBatchEncoder) flush() { ts := headers.GetTs(0) schema := headers.GetSchema(0) table := headers.GetTable(0) - e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)) + e.messageBuf = append(e.messageBuf, NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)) } // AppendRowChangedEvent implements the EventBatchEncoder interface @@ -65,7 +66,7 @@ func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, // EncodeDDLEvent implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) { - return newDDLMQMessage(ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil + return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil } // Build implements the EventBatchEncoder interface diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 45d56ff1d9a..bb2be44682a 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -16,11 +16,11 @@ package codec import ( "context" "encoding/binary" - "strings" "time" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/security" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -63,7 +63,7 @@ type MQMessage struct { Schema *string // schema Table *string // table Type model.MqMessageType // type - Protocol Protocol // protocol + Protocol config.Protocol // protocol } // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. @@ -83,17 +83,17 @@ func (m *MQMessage) PhysicalTime() time.Time { return oracle.GetTimeFromTS(m.Ts) } -func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { +func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table) } -func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessage { +func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil) } // NewMQMessage should be used when creating a MQMessage struct. // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. -func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage { +func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage { ret := &MQMessage{ Key: nil, Value: nil, @@ -143,58 +143,24 @@ const ( EncoderNeedSyncWrite ) -// Protocol is the protocol of the mq message -type Protocol int - -// Enum types of the Protocol -const ( - ProtocolDefault Protocol = iota - ProtocolCanal - ProtocolAvro - ProtocolMaxwell - ProtocolCanalJSON - ProtocolCraft -) - -// FromString converts the protocol from string to Protocol enum type -func (p *Protocol) FromString(protocol string) { - switch strings.ToLower(protocol) { - case "default": - *p = ProtocolDefault - case "canal": - *p = ProtocolCanal - case "avro": - *p = ProtocolAvro - case "maxwell": - *p = ProtocolMaxwell - case "canal-json": - *p = ProtocolCanalJSON - case "craft": - *p = ProtocolCraft - default: - *p = ProtocolDefault - log.Warn("can't support codec protocol, using default protocol", zap.String("protocol", protocol)) - } -} - type EncoderBuilder interface { Build(ctx context.Context) (EventBatchEncoder, error) } // NewEventBatchEncoderBuilder returns an EncoderBuilder -func NewEventBatchEncoderBuilder(p Protocol, credential *security.Credential, opts map[string]string) (EncoderBuilder, error) { +func NewEventBatchEncoderBuilder(p config.Protocol, credential *security.Credential, opts map[string]string) (EncoderBuilder, error) { switch p { - case ProtocolDefault: + case config.ProtocolDefault: return newJSONEventBatchEncoderBuilder(opts), nil - case ProtocolCanal: + case config.ProtocolCanal: return newCanalEventBatchEncoderBuilder(opts), nil - case ProtocolAvro: + case config.ProtocolAvro: return newAvroEventBatchEncoderBuilder(credential, opts) - case ProtocolMaxwell: + case config.ProtocolMaxwell: return newMaxwellEventBatchEncoderBuilder(opts), nil - case ProtocolCanalJSON: + case config.ProtocolCanalJSON: return newCanalFlatEventBatchEncoderBuilder(opts), nil - case ProtocolCraft: + case config.ProtocolCraft: return newCraftEventBatchEncoderBuilder(opts), nil default: log.Warn("unknown codec protocol value of EventBatchEncoder, use open-protocol as the default", zap.Int("protocol_value", int(p))) diff --git a/cdc/sink/codec/interface_test.go b/cdc/sink/codec/interface_test.go index 6c28b52fb78..4bf669e7067 100644 --- a/cdc/sink/codec/interface_test.go +++ b/cdc/sink/codec/interface_test.go @@ -16,6 +16,7 @@ package codec import ( "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -54,7 +55,7 @@ func (s *codecInterfaceSuite) TestCreate(c *check.C) { CommitTs: 5678, } - msg := NewMQMessage(ProtocolDefault, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MqMessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table) + msg := NewMQMessage(config.ProtocolDefault, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MqMessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table) c.Assert(msg.Key, check.BytesEquals, []byte("key1")) c.Assert(msg.Value, check.BytesEquals, []byte("value1")) @@ -62,7 +63,7 @@ func (s *codecInterfaceSuite) TestCreate(c *check.C) { c.Assert(msg.Type, check.Equals, model.MqMessageTypeRow) c.Assert(*msg.Schema, check.Equals, rowEvent.Table.Schema) c.Assert(*msg.Table, check.Equals, rowEvent.Table.Table) - c.Assert(msg.Protocol, check.Equals, ProtocolDefault) + c.Assert(msg.Protocol, check.Equals, config.ProtocolDefault) job := &timodel.Job{ ID: 1071, @@ -100,21 +101,21 @@ func (s *codecInterfaceSuite) TestCreate(c *check.C) { ddlEvent := &model.DDLEvent{} ddlEvent.FromJob(job, preTableInfo) - msg = newDDLMQMessage(ProtocolMaxwell, nil, []byte("value1"), ddlEvent) + msg = newDDLMQMessage(config.ProtocolMaxwell, nil, []byte("value1"), ddlEvent) c.Assert(msg.Key, check.IsNil) c.Assert(msg.Value, check.BytesEquals, []byte("value1")) c.Assert(msg.Ts, check.Equals, ddlEvent.CommitTs) c.Assert(msg.Type, check.Equals, model.MqMessageTypeDDL) c.Assert(*msg.Schema, check.Equals, ddlEvent.TableInfo.Schema) c.Assert(*msg.Table, check.Equals, ddlEvent.TableInfo.Table) - c.Assert(msg.Protocol, check.Equals, ProtocolMaxwell) + c.Assert(msg.Protocol, check.Equals, config.ProtocolMaxwell) - msg = newResolvedMQMessage(ProtocolCanal, []byte("key1"), nil, 1234) + msg = newResolvedMQMessage(config.ProtocolCanal, []byte("key1"), nil, 1234) c.Assert(msg.Key, check.BytesEquals, []byte("key1")) c.Assert(msg.Value, check.IsNil) c.Assert(msg.Ts, check.Equals, uint64(1234)) c.Assert(msg.Type, check.Equals, model.MqMessageTypeResolved) c.Assert(msg.Schema, check.IsNil) c.Assert(msg.Table, check.IsNil) - c.Assert(msg.Protocol, check.Equals, ProtocolCanal) + c.Assert(msg.Protocol, check.Equals, config.ProtocolCanal) } diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 723ef7853dd..b1c4805d599 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -25,12 +25,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" - - "github.com/pingcap/ticdc/cdc/model" ) const ( @@ -398,7 +398,7 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er valueBuf := new(bytes.Buffer) valueBuf.Write(valueLenByte[:]) - ret := newResolvedMQMessage(ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), ts) + ret := newResolvedMQMessage(config.ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), ts) return ret, nil } @@ -442,7 +442,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) versionHead := make([]byte, 8) binary.BigEndian.PutUint64(versionHead, BatchVersion1) - d.messageBuf = append(d.messageBuf, NewMQMessage(ProtocolDefault, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil)) + d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolDefault, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil)) d.curBatchSize = 0 } @@ -501,7 +501,7 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e valueBuf.Write(valueLenByte[:]) valueBuf.Write(value) - ret := newDDLMQMessage(ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), e) + ret := newDDLMQMessage(config.ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), e) return ret, nil } @@ -512,7 +512,7 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { return nil } /* there could be multiple types of event encoded within a single message which means the type is not sure */ - ret := NewMQMessage(ProtocolDefault, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil) + ret := NewMQMessage(config.ProtocolDefault, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil) return []*MQMessage{ret} } diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 9be0f7ba305..6eba2fed71a 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" model2 "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -287,7 +288,7 @@ func (d *MaxwellEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage return nil, errors.Trace(err) } - return newDDLMQMessage(ProtocolMaxwell, key, value, e), nil + return newDDLMQMessage(config.ProtocolMaxwell, key, value, e), nil } // Build implements the EventBatchEncoder interface @@ -296,7 +297,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { return nil } - ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) + ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) d.Reset() return []*MQMessage{ret} } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 5a2bf4edbac..d766fc60fbd 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -49,7 +49,7 @@ type mqSink struct { dispatcher dispatcher.Dispatcher encoderBuilder codec.EncoderBuilder filter *filter.Filter - protocol codec.Protocol + protocol config.Protocol partitionNum int32 partitionInput []chan mqEvent @@ -64,15 +64,10 @@ type mqSink struct { func newMqSink( ctx context.Context, credential *security.Credential, mqProducer producer.Producer, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error, ) (*mqSink, error) { - var protocol codec.Protocol - protocol.FromString(config.Sink.Protocol) - if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue { - log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config") - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) - } - + var protocol config.Protocol + protocol.FromString(replicaConfig.Sink.Protocol) encoderBuilder, err := codec.NewEventBatchEncoderBuilder(protocol, credential, opts) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) @@ -83,7 +78,7 @@ func newMqSink( } partitionNum := mqProducer.GetPartitionNum() - d, err := dispatcher.NewDispatcher(config, partitionNum) + d, err := dispatcher.NewDispatcher(replicaConfig, partitionNum) if err != nil { return nil, errors.Trace(err) } @@ -372,6 +367,7 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, } func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { +<<<<<<< HEAD scheme := strings.ToLower(sinkURI.Scheme) if scheme != "kafka" && scheme != "kafka+ssl" { return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme) @@ -379,17 +375,34 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi config := kafka.NewConfig() if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil { +======= + producerConfig := kafka.NewConfig() + if err := producerConfig.CompleteByOpts(sinkURI, replicaConfig, opts); err != nil { +>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747)) return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } + // NOTICE: Please check after the completion, as we may get the configuration from the sinkURI. + err := replicaConfig.Validate() + if err != nil { + return nil, err + } topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { return r == '/' }) +<<<<<<< HEAD producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh) +======= + if topic == "" { + return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") + } + + sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, errCh) +>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747)) if err != nil { return nil, errors.Trace(err) } - sink, err := newMqSink(ctx, config.Credential, producer, filter, replicaConfig, opts, errCh) + sink, err := newMqSink(ctx, producerConfig.Credential, sProducer, filter, replicaConfig, opts, errCh) if err != nil { return nil, errors.Trace(err) } @@ -415,8 +428,17 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { opts["max-batch-size"] = s } +<<<<<<< HEAD // For now, it's a place holder. Avro format have to make connection to Schema Registery, // and it may needs credential. +======= + err = replicaConfig.Validate() + if err != nil { + return nil, err + } + // For now, it's a placeholder. Avro format have to make connection to Schema Registry, + // and it may need credential. +>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747)) credential := &security.Credential{} sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh) if err != nil { diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index c49e4c8cb2e..a7283323198 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -205,9 +205,12 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co } protocol := sinkURIParsed.Query().Get("protocol") + if protocol != "" { + cfg.Sink.Protocol = protocol + } for _, fp := range forceEnableOldValueProtocols { - if protocol == fp { - log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) + if cfg.Sink.Protocol == fp { + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) cfg.EnableOldValue = true break } @@ -259,7 +262,6 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co // TODO(neil) enable ID bucket. } } - // Complete cfg. o.cfg = cfg @@ -272,6 +274,11 @@ func (o *createChangefeedOptions) validate(ctx context.Context, cmd *cobra.Comma return errors.New("Creating changefeed without a sink-uri") } + err := o.cfg.Validate() + if err != nil { + return err + } + if err := o.validateStartTs(ctx); err != nil { return err } diff --git a/pkg/config/mq_sink_protocol.go b/pkg/config/mq_sink_protocol.go new file mode 100644 index 00000000000..ff6ceeaed32 --- /dev/null +++ b/pkg/config/mq_sink_protocol.go @@ -0,0 +1,75 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "strings" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Protocol is the protocol of the mq message. +type Protocol int + +// Enum types of the Protocol. +const ( + ProtocolDefault Protocol = iota + ProtocolCanal + ProtocolAvro + ProtocolMaxwell + ProtocolCanalJSON + ProtocolCraft +) + +// FromString converts the protocol from string to Protocol enum type. +func (p *Protocol) FromString(protocol string) { + switch strings.ToLower(protocol) { + case "default": + *p = ProtocolDefault + case "canal": + *p = ProtocolCanal + case "avro": + *p = ProtocolAvro + case "maxwell": + *p = ProtocolMaxwell + case "canal-json": + *p = ProtocolCanalJSON + case "craft": + *p = ProtocolCraft + default: + *p = ProtocolDefault + log.Warn("can't support codec protocol, using default protocol", zap.String("protocol", protocol)) + } +} + +// String converts the Protocol enum type string to string. +func (p Protocol) String() string { + switch p { + case ProtocolDefault: + return "default" + case ProtocolCanal: + return "canal" + case ProtocolAvro: + return "avro" + case ProtocolMaxwell: + return "maxwell" + case ProtocolCanalJSON: + return "canal-json" + case ProtocolCraft: + return "craft" + default: + panic("unreachable") + } +} diff --git a/pkg/config/mq_sink_protocol_test.go b/pkg/config/mq_sink_protocol_test.go new file mode 100644 index 00000000000..4cfa8c1bbce --- /dev/null +++ b/pkg/config/mq_sink_protocol_test.go @@ -0,0 +1,100 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFromString(t *testing.T) { + t.Parallel() + testCases := []struct { + protocol string + expectedProtocolEnum Protocol + }{ + { + protocol: "unknown", + expectedProtocolEnum: ProtocolDefault, + }, + { + protocol: "default", + expectedProtocolEnum: ProtocolDefault, + }, + { + protocol: "canal", + expectedProtocolEnum: ProtocolCanal, + }, + { + protocol: "canal-json", + expectedProtocolEnum: ProtocolCanalJSON, + }, + { + protocol: "maxwell", + expectedProtocolEnum: ProtocolMaxwell, + }, + { + protocol: "avro", + expectedProtocolEnum: ProtocolAvro, + }, + { + protocol: "craft", + expectedProtocolEnum: ProtocolCraft, + }, + } + + for _, tc := range testCases { + var protocol Protocol + protocol.FromString(tc.protocol) + require.Equal(t, tc.expectedProtocolEnum, protocol) + } +} + +func TestString(t *testing.T) { + t.Parallel() + testCases := []struct { + protocolEnum Protocol + expectedProtocol string + }{ + { + protocolEnum: ProtocolDefault, + expectedProtocol: "default", + }, + { + protocolEnum: ProtocolCanal, + expectedProtocol: "canal", + }, + { + protocolEnum: ProtocolCanalJSON, + expectedProtocol: "canal-json", + }, + { + protocolEnum: ProtocolMaxwell, + expectedProtocol: "maxwell", + }, + { + protocolEnum: ProtocolAvro, + expectedProtocol: "avro", + }, + { + protocolEnum: ProtocolCraft, + expectedProtocol: "craft", + }, + } + + for _, tc := range testCases { + require.Equal(t, tc.expectedProtocol, tc.protocolEnum.String()) + } +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go new file mode 100644 index 00000000000..d187f270ffc --- /dev/null +++ b/pkg/config/replica_config.go @@ -0,0 +1,146 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + "fmt" + + "github.com/pingcap/ticdc/pkg/config/outdated" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + cerror "github.com/pingcap/ticdc/pkg/errors" + "go.uber.org/zap" +) + +var defaultReplicaConfig = &ReplicaConfig{ + CaseSensitive: true, + EnableOldValue: true, + CheckGCSafePoint: true, + Filter: &FilterConfig{ + Rules: []string{"*.*"}, + }, + Mounter: &MounterConfig{ + WorkerNum: 16, + }, + Sink: &SinkConfig{ + Protocol: "default", + }, + Cyclic: &CyclicConfig{ + Enable: false, + }, + Scheduler: &SchedulerConfig{ + Tp: "table-number", + PollingTime: -1, + }, + Consistent: &ConsistentConfig{ + Level: "none", + MaxLogSize: 64, + FlushIntervalInMs: 1000, + Storage: "", + }, +} + +// ReplicaConfig represents some addition replication config for a changefeed +type ReplicaConfig replicaConfig + +type replicaConfig struct { + CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` + EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` + ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` + CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` + Filter *FilterConfig `toml:"filter" json:"filter"` + Mounter *MounterConfig `toml:"mounter" json:"mounter"` + Sink *SinkConfig `toml:"sink" json:"sink"` + Cyclic *CyclicConfig `toml:"cyclic-replication" json:"cyclic-replication"` + Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + Consistent *ConsistentConfig `toml:"consistent" json:"consistent"` +} + +// Marshal returns the json marshal format of a ReplicationConfig +func (c *ReplicaConfig) Marshal() (string, error) { + cfg, err := json.Marshal(c) + if err != nil { + return "", cerror.WrapError(cerror.ErrEncodeFailed, errors.Annotatef(err, "Unmarshal data: %v", c)) + } + return string(cfg), nil +} + +// Unmarshal unmarshals into *ReplicationConfig from json marshal byte slice +func (c *ReplicaConfig) Unmarshal(data []byte) error { + return c.UnmarshalJSON(data) +} + +// UnmarshalJSON unmarshals into *ReplicationConfig from json marshal byte slice +func (c *ReplicaConfig) UnmarshalJSON(data []byte) error { + // The purpose of casting ReplicaConfig to replicaConfig is to avoid recursive calls UnmarshalJSON, + // resulting in stack overflow + r := (*replicaConfig)(c) + err := json.Unmarshal(data, &r) + if err != nil { + return cerror.WrapError(cerror.ErrDecodeFailed, err) + } + v1 := outdated.ReplicaConfigV1{} + err = v1.Unmarshal(data) + if err != nil { + return cerror.WrapError(cerror.ErrDecodeFailed, err) + } + r.fillFromV1(&v1) + return nil +} + +// Clone clones a replication +func (c *ReplicaConfig) Clone() *ReplicaConfig { + str, err := c.Marshal() + if err != nil { + log.Panic("failed to marshal replica config", + zap.Error(cerror.WrapError(cerror.ErrDecodeFailed, err))) + } + clone := new(ReplicaConfig) + err = clone.Unmarshal([]byte(str)) + if err != nil { + log.Panic("failed to unmarshal replica config", + zap.Error(cerror.WrapError(cerror.ErrDecodeFailed, err))) + } + return clone +} + +func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { + if v1 == nil || v1.Sink == nil { + return + } + for _, dispatch := range v1.Sink.DispatchRules { + c.Sink.DispatchRules = append(c.Sink.DispatchRules, &DispatchRule{ + Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)}, + Dispatcher: dispatch.Rule, + }) + } +} + +// Validate verifies that each parameter is valid. +func (c *ReplicaConfig) Validate() error { + if c.Sink != nil { + err := c.Sink.validate(c.EnableOldValue) + if err != nil { + return err + } + } + return nil +} + +// GetDefaultReplicaConfig returns the default replica config. +func GetDefaultReplicaConfig() *ReplicaConfig { + return defaultReplicaConfig.Clone() +} diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go new file mode 100644 index 00000000000..d99d0efc22d --- /dev/null +++ b/pkg/config/replica_config_test.go @@ -0,0 +1,95 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "bytes" + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +func mustIdentJSON(t *testing.T, j string) string { + var buf bytes.Buffer + err := json.Indent(&buf, []byte(j), "", " ") + require.Nil(t, err) + return buf.String() +} + +func TestReplicaConfigMarshal(t *testing.T) { + t.Parallel() + conf := GetDefaultReplicaConfig() + conf.CaseSensitive = false + conf.ForceReplicate = true + conf.Filter.Rules = []string{"1.1"} + conf.Mounter.WorkerNum = 3 + conf.Sink.ColumnSelectors = []*ColumnSelector{ + { + Matcher: []string{"1.1"}, + Columns: []string{"a", "b"}, + }, + } + b, err := conf.Marshal() + require.Nil(t, err) + require.Equal(t, testCfgTestReplicaConfigMarshal1, mustIdentJSON(t, b)) + conf2 := new(ReplicaConfig) + err = conf2.Unmarshal([]byte(testCfgTestReplicaConfigMarshal2)) + require.Nil(t, err) + require.Equal(t, conf, conf2) +} + +func TestReplicaConfigClone(t *testing.T) { + t.Parallel() + conf := GetDefaultReplicaConfig() + conf.CaseSensitive = false + conf.ForceReplicate = true + conf.Filter.Rules = []string{"1.1"} + conf.Mounter.WorkerNum = 3 + conf2 := conf.Clone() + require.Equal(t, conf, conf2) + conf2.Mounter.WorkerNum = 4 + require.Equal(t, 3, conf.Mounter.WorkerNum) +} + +func TestReplicaConfigOutDated(t *testing.T) { + t.Parallel() + conf2 := new(ReplicaConfig) + err := conf2.Unmarshal([]byte(testCfgTestReplicaConfigOutDated)) + require.Nil(t, err) + + conf := GetDefaultReplicaConfig() + conf.CaseSensitive = false + conf.ForceReplicate = true + conf.Filter.Rules = []string{"1.1"} + conf.Mounter.WorkerNum = 3 + conf.Sink.DispatchRules = []*DispatchRule{ + {Matcher: []string{"a.b"}, Dispatcher: "r1"}, + {Matcher: []string{"a.c"}, Dispatcher: "r2"}, + {Matcher: []string{"a.d"}, Dispatcher: "r2"}, + } + require.Equal(t, conf, conf2) +} + +func TestReplicaConfigValidate(t *testing.T) { + t.Parallel() + conf := GetDefaultReplicaConfig() + require.Nil(t, conf.Validate()) + + // Incorrect sink configuration. + conf = GetDefaultReplicaConfig() + conf.Sink.Protocol = "canal" + conf.EnableOldValue = false + require.Regexp(t, ".*canal protocol requires old value to be enabled.*", conf.Validate()) +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 90fff12451d..b647c376aba 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -13,6 +13,14 @@ package config +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + // SinkConfig represents sink config for a changefeed type SinkConfig struct { DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"` @@ -24,3 +32,26 @@ type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` Dispatcher string `toml:"dispatcher" json:"dispatcher"` } +<<<<<<< HEAD +======= + +type ColumnSelector struct { + Matcher []string `toml:"matcher" json:"matcher"` + Columns []string `toml:"columns" json:"columns"` +} + +func (s *SinkConfig) validate(enableOldValue bool) error { + protocol := s.Protocol + if !enableOldValue { + switch protocol { + case ProtocolCanal.String(), ProtocolCanalJSON.String(), ProtocolMaxwell.String(): + log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ + "Please update changefeed config", protocol)) + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, + errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol))) + } + } + + return nil +} +>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747)) diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go new file mode 100644 index 00000000000..c34fcd832bf --- /dev/null +++ b/pkg/config/sink_test.go @@ -0,0 +1,81 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidate(t *testing.T) { + t.Parallel() + testCases := []struct { + protocol string + enableOldValue bool + expectedErr string + }{ + { + protocol: "default", + enableOldValue: false, + expectedErr: "", + }, + { + protocol: "default", + enableOldValue: true, + expectedErr: "", + }, + { + protocol: "canal-json", + enableOldValue: false, + expectedErr: ".*canal-json protocol requires old value to be enabled.*", + }, + { + protocol: "canal-json", + enableOldValue: true, + expectedErr: "", + }, + { + protocol: "canal", + enableOldValue: false, + expectedErr: ".*canal protocol requires old value to be enabled.*", + }, + { + protocol: "canal", + enableOldValue: true, + expectedErr: "", + }, + { + protocol: "maxwell", + enableOldValue: false, + expectedErr: ".*maxwell protocol requires old value to be enabled.*", + }, + { + protocol: "maxwell", + enableOldValue: true, + expectedErr: "", + }, + } + + for _, tc := range testCases { + cfg := SinkConfig{ + Protocol: tc.protocol, + } + if tc.expectedErr == "" { + require.Nil(t, cfg.validate(tc.enableOldValue)) + } else { + require.Regexp(t, tc.expectedErr, cfg.validate(tc.enableOldValue)) + } + } +}