diff --git a/cdc/sink/codec/avro.go b/cdc/sink/mq/codec/avro.go similarity index 100% rename from cdc/sink/codec/avro.go rename to cdc/sink/mq/codec/avro.go diff --git a/cdc/sink/codec/avro_test.go b/cdc/sink/mq/codec/avro_test.go similarity index 100% rename from cdc/sink/codec/avro_test.go rename to cdc/sink/mq/codec/avro_test.go diff --git a/cdc/sink/codec/avro_test_data.go b/cdc/sink/mq/codec/avro_test_data.go similarity index 100% rename from cdc/sink/codec/avro_test_data.go rename to cdc/sink/mq/codec/avro_test_data.go diff --git a/cdc/sink/codec/canal.go b/cdc/sink/mq/codec/canal.go similarity index 100% rename from cdc/sink/codec/canal.go rename to cdc/sink/mq/codec/canal.go diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/mq/codec/canal_flat.go similarity index 100% rename from cdc/sink/codec/canal_flat.go rename to cdc/sink/mq/codec/canal_flat.go diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/mq/codec/canal_flat_test.go similarity index 100% rename from cdc/sink/codec/canal_flat_test.go rename to cdc/sink/mq/codec/canal_flat_test.go diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/mq/codec/canal_test.go similarity index 100% rename from cdc/sink/codec/canal_test.go rename to cdc/sink/mq/codec/canal_test.go diff --git a/cdc/sink/codec/codec_test.go b/cdc/sink/mq/codec/codec_test.go similarity index 99% rename from cdc/sink/codec/codec_test.go rename to cdc/sink/mq/codec/codec_test.go index 4880a78cc98..83e8ff93226 100644 --- a/cdc/sink/codec/codec_test.go +++ b/cdc/sink/mq/codec/codec_test.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec/craft" + "github.com/pingcap/tiflow/cdc/sink/mq/codec/craft" "github.com/pingcap/tiflow/proto/benchmark" "github.com/stretchr/testify/require" ) diff --git a/cdc/sink/codec/config.go b/cdc/sink/mq/codec/config.go similarity index 100% rename from cdc/sink/codec/config.go rename to cdc/sink/mq/codec/config.go diff --git a/cdc/sink/codec/config_test.go b/cdc/sink/mq/codec/config_test.go similarity index 100% rename from cdc/sink/codec/config_test.go rename to cdc/sink/mq/codec/config_test.go diff --git a/cdc/sink/codec/craft.go b/cdc/sink/mq/codec/craft.go similarity index 99% rename from cdc/sink/codec/craft.go rename to cdc/sink/mq/codec/craft.go index 4645cbece9d..51dbb165a3b 100644 --- a/cdc/sink/codec/craft.go +++ b/cdc/sink/mq/codec/craft.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec/craft" + "github.com/pingcap/tiflow/cdc/sink/mq/codec/craft" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) diff --git a/cdc/sink/codec/craft/buffer.go b/cdc/sink/mq/codec/craft/buffer.go similarity index 100% rename from cdc/sink/codec/craft/buffer.go rename to cdc/sink/mq/codec/craft/buffer.go diff --git a/cdc/sink/codec/craft/codec_test.go b/cdc/sink/mq/codec/craft/codec_test.go similarity index 100% rename from cdc/sink/codec/craft/codec_test.go rename to cdc/sink/mq/codec/craft/codec_test.go diff --git a/cdc/sink/codec/craft/decoder.go b/cdc/sink/mq/codec/craft/decoder.go similarity index 100% rename from cdc/sink/codec/craft/decoder.go rename to cdc/sink/mq/codec/craft/decoder.go diff --git a/cdc/sink/codec/craft/encoder.go b/cdc/sink/mq/codec/craft/encoder.go similarity index 100% rename from cdc/sink/codec/craft/encoder.go rename to cdc/sink/mq/codec/craft/encoder.go diff --git a/cdc/sink/codec/craft/model.go b/cdc/sink/mq/codec/craft/model.go similarity index 100% rename from cdc/sink/codec/craft/model.go rename to cdc/sink/mq/codec/craft/model.go diff --git a/cdc/sink/codec/craft_test.go b/cdc/sink/mq/codec/craft_test.go similarity index 100% rename from cdc/sink/codec/craft_test.go rename to cdc/sink/mq/codec/craft_test.go diff --git a/cdc/sink/mq/codec/decoder.go b/cdc/sink/mq/codec/decoder.go new file mode 100644 index 00000000000..7e49769dd97 --- /dev/null +++ b/cdc/sink/mq/codec/decoder.go @@ -0,0 +1,32 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import "github.com/pingcap/tiflow/cdc/model" + +// EventBatchDecoder is an abstraction for events decoder +// this interface is only for testing now +type EventBatchDecoder interface { + // HasNext returns + // 1. the type of the next event + // 2. a bool if the next event is exist + // 3. error + HasNext() (model.MqMessageType, bool, error) + // NextResolvedEvent returns the next resolved event if exists + NextResolvedEvent() (uint64, error) + // NextRowChangedEvent returns the next row changed event if exists + NextRowChangedEvent() (*model.RowChangedEvent, error) + // NextDDLEvent returns the next DDL event if exists + NextDDLEvent() (*model.DDLEvent, error) +} diff --git a/cdc/sink/mq/codec/encoder.go b/cdc/sink/mq/codec/encoder.go new file mode 100644 index 00000000000..9d7b6de540d --- /dev/null +++ b/cdc/sink/mq/codec/encoder.go @@ -0,0 +1,63 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "context" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// EventBatchEncoder is an abstraction for events encoder +type EventBatchEncoder interface { + // EncodeCheckpointEvent appends a checkpoint event into the batch. + // This event will be broadcast to all partitions to signal a global checkpoint. + EncodeCheckpointEvent(ts uint64) (*MQMessage, error) + // AppendRowChangedEvent appends the calling context, a row changed event and the dispatch + // topic into the batch + AppendRowChangedEvent(context.Context, string, *model.RowChangedEvent) error + // EncodeDDLEvent appends a DDL event into the batch + EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) + // Build builds the batch and returns the bytes of key and value. + Build() []*MQMessage + // Size returns the size of the batch(bytes) + Size() int +} + +// EncoderBuilder builds encoder with context. +type EncoderBuilder interface { + Build() EventBatchEncoder +} + +// NewEventBatchEncoderBuilder returns an EncoderBuilder +func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder, error) { + switch c.protocol { + case config.ProtocolDefault, config.ProtocolOpen: + return newJSONEventBatchEncoderBuilder(c), nil + case config.ProtocolCanal: + return newCanalEventBatchEncoderBuilder(), nil + case config.ProtocolAvro: + return newAvroEventBatchEncoderBuilder(ctx, c) + case config.ProtocolMaxwell: + return newMaxwellEventBatchEncoderBuilder(), nil + case config.ProtocolCanalJSON: + return newCanalFlatEventBatchEncoderBuilder(c), nil + case config.ProtocolCraft: + return newCraftEventBatchEncoderBuilder(c), nil + default: + return nil, cerror.ErrMQSinkUnknownProtocol.GenWithStackByArgs(c.protocol) + } +} diff --git a/cdc/sink/codec/java.go b/cdc/sink/mq/codec/java.go similarity index 100% rename from cdc/sink/codec/java.go rename to cdc/sink/mq/codec/java.go diff --git a/cdc/sink/codec/json.go b/cdc/sink/mq/codec/json.go similarity index 100% rename from cdc/sink/codec/json.go rename to cdc/sink/mq/codec/json.go diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/mq/codec/json_test.go similarity index 100% rename from cdc/sink/codec/json_test.go rename to cdc/sink/mq/codec/json_test.go diff --git a/cdc/sink/codec/main_test.go b/cdc/sink/mq/codec/main_test.go similarity index 100% rename from cdc/sink/codec/main_test.go rename to cdc/sink/mq/codec/main_test.go diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/mq/codec/maxwell.go similarity index 100% rename from cdc/sink/codec/maxwell.go rename to cdc/sink/mq/codec/maxwell.go diff --git a/cdc/sink/codec/maxwell_test.go b/cdc/sink/mq/codec/maxwell_test.go similarity index 100% rename from cdc/sink/codec/maxwell_test.go rename to cdc/sink/mq/codec/maxwell_test.go diff --git a/cdc/sink/codec/interface.go b/cdc/sink/mq/codec/message.go similarity index 56% rename from cdc/sink/codec/interface.go rename to cdc/sink/mq/codec/message.go index 4436be7d9f4..a7033b60100 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/mq/codec/message.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2022 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,32 +14,14 @@ package codec import ( - "context" "encoding/binary" "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/tikv/client-go/v2/oracle" ) -// EventBatchEncoder is an abstraction for events encoder -type EventBatchEncoder interface { - // EncodeCheckpointEvent appends a checkpoint event into the batch. - // This event will be broadcast to all partitions to signal a global checkpoint. - EncodeCheckpointEvent(ts uint64) (*MQMessage, error) - // AppendRowChangedEvent appends the calling context, a row changed event and the dispatch - // topic into the batch - AppendRowChangedEvent(context.Context, string, *model.RowChangedEvent) error - // EncodeDDLEvent appends a DDL event into the batch - EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) - // Build builds the batch and returns the bytes of key and value. - Build() []*MQMessage - // Size returns the size of the batch(bytes) - Size() int -} - // MQMessage represents an MQ message to the mqSink type MQMessage struct { Key []byte @@ -133,53 +115,3 @@ func NewMQMessage( return ret } - -// EventBatchDecoder is an abstraction for events decoder -// this interface is only for testing now -type EventBatchDecoder interface { - // HasNext returns - // 1. the type of the next event - // 2. a bool if the next event is exist - // 3. error - HasNext() (model.MqMessageType, bool, error) - // NextResolvedEvent returns the next resolved event if exists - NextResolvedEvent() (uint64, error) - // NextRowChangedEvent returns the next row changed event if exists - NextRowChangedEvent() (*model.RowChangedEvent, error) - // NextDDLEvent returns the next DDL event if exists - NextDDLEvent() (*model.DDLEvent, error) -} - -// EncoderResult indicates an action request by the encoder to the mqSink -type EncoderResult uint8 - -// Enum types of EncoderResult -const ( - EncoderNeedAsyncWrite EncoderResult = iota - EncoderNeedSyncWrite -) - -// EncoderBuilder builds encoder with context. -type EncoderBuilder interface { - Build() EventBatchEncoder -} - -// NewEventBatchEncoderBuilder returns an EncoderBuilder -func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder, error) { - switch c.protocol { - case config.ProtocolDefault, config.ProtocolOpen: - return newJSONEventBatchEncoderBuilder(c), nil - case config.ProtocolCanal: - return newCanalEventBatchEncoderBuilder(), nil - case config.ProtocolAvro: - return newAvroEventBatchEncoderBuilder(ctx, c) - case config.ProtocolMaxwell: - return newMaxwellEventBatchEncoderBuilder(), nil - case config.ProtocolCanalJSON: - return newCanalFlatEventBatchEncoderBuilder(c), nil - case config.ProtocolCraft: - return newCraftEventBatchEncoderBuilder(c), nil - default: - return nil, cerror.ErrMQSinkUnknownProtocol.GenWithStackByArgs(c.protocol) - } -} diff --git a/cdc/sink/codec/interface_test.go b/cdc/sink/mq/codec/message_test.go similarity index 99% rename from cdc/sink/codec/interface_test.go rename to cdc/sink/mq/codec/message_test.go index 5af0007270f..99fb8c2e25f 100644 --- a/cdc/sink/codec/interface_test.go +++ b/cdc/sink/mq/codec/message_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 PingCAP, Inc. +// Copyright 2022 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/mq/codec/schema_registry.go similarity index 100% rename from cdc/sink/codec/schema_registry.go rename to cdc/sink/mq/codec/schema_registry.go diff --git a/cdc/sink/codec/schema_registry_test.go b/cdc/sink/mq/codec/schema_registry_test.go similarity index 100% rename from cdc/sink/codec/schema_registry_test.go rename to cdc/sink/mq/codec/schema_registry_test.go diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index 5487631a821..4294cde7752 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/mq/manager" kafkamanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/kafka" diff --git a/cdc/sink/mq/mq_flush_worker.go b/cdc/sink/mq/mq_flush_worker.go index c467ae0d8df..3e86fd285c3 100644 --- a/cdc/sink/mq/mq_flush_worker.go +++ b/cdc/sink/mq/mq_flush_worker.go @@ -20,8 +20,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" "github.com/pingcap/tiflow/cdc/sink/mq/producer" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" diff --git a/cdc/sink/mq/mq_flush_worker_test.go b/cdc/sink/mq/mq_flush_worker_test.go index c8ba0adc460..67d93864762 100644 --- a/cdc/sink/mq/mq_flush_worker_test.go +++ b/cdc/sink/mq/mq_flush_worker_test.go @@ -20,8 +20,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" ) diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index e01e6c53daf..f049d9428aa 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" kafkap "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" diff --git a/cdc/sink/mq/producer/kafka/config_test.go b/cdc/sink/mq/producer/kafka/config_test.go index a99084396c1..bfc55b9ed33 100644 --- a/cdc/sink/mq/producer/kafka/config_test.go +++ b/cdc/sink/mq/producer/kafka/config_test.go @@ -24,7 +24,7 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/contextutil" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/kafka" diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 6145ab141e9..6f64a5afab3 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -28,7 +28,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/kafka" "github.com/pingcap/tiflow/pkg/util" diff --git a/cdc/sink/mq/producer/kafka/kafka_test.go b/cdc/sink/mq/producer/kafka/kafka_test.go index 1b35ceb3dcc..59286e6a2e2 100644 --- a/cdc/sink/mq/producer/kafka/kafka_test.go +++ b/cdc/sink/mq/producer/kafka/kafka_test.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" "github.com/pingcap/tiflow/pkg/kafka" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" diff --git a/cdc/sink/mq/producer/mq_producer.go b/cdc/sink/mq/producer/mq_producer.go index ba11c3c62e8..ff79842b4d8 100644 --- a/cdc/sink/mq/producer/mq_producer.go +++ b/cdc/sink/mq/producer/mq_producer.go @@ -16,7 +16,7 @@ package producer import ( "context" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" ) // Producer is an interface of mq producer diff --git a/cdc/sink/mq/producer/pulsar/producer.go b/cdc/sink/mq/producer/pulsar/producer.go index 8f1a5a5ab4b..c2e4823f14f 100644 --- a/cdc/sink/mq/producer/pulsar/producer.go +++ b/cdc/sink/mq/producer/pulsar/producer.go @@ -21,7 +21,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/failpoint" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 9fbe12b782a..e33f54e5906 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -36,7 +36,7 @@ import ( "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink" - "github.com/pingcap/tiflow/cdc/sink/codec" + "github.com/pingcap/tiflow/cdc/sink/mq/codec" "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config"