Skip to content

Commit

Permalink
Merge branch 'master' into dead-dml
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Sep 6, 2023
2 parents 002149b + e1730e5 commit bd44ce3
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 64 deletions.
22 changes: 15 additions & 7 deletions metrics/grafana/TiCDC-Monitor-Summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@
},
{
"datasource": "${DS_TEST-CLUSTER}",
"description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown",
"description": "The status of each changefeed.\n\n0: Normal\n\n1 and 6: Warning\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown",
"fieldConfig": {
"defaults": {
"color": {
Expand Down Expand Up @@ -748,7 +748,7 @@
{
"from": "",
"id": 2,
"text": "Error",
"text": "Warning",
"to": "",
"type": 1,
"value": "1"
Expand Down Expand Up @@ -780,22 +780,30 @@
{
"from": "",
"id": 6,
"text": "Warning",
"to": "",
"type": 1,
"value": "6"
},
{
"from": "",
"id": 7,
"text": "Unknown",
"to": "",
"type": 1,
"value": "-1"
},
{
"from": "5",
"id": 7,
"from": "7",
"id": 8,
"text": "Other",
"to": "10000",
"type": 1,
"value": "5"
"value": "7"
},
{
"from": "6",
"id": 8,
"from": "7",
"id": 9,
"text": "-",
"to": "1000",
"type": 2
Expand Down
24 changes: 16 additions & 8 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(increase(tikv_cdc_scan_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$tikv_instance\"}[1m])) by (type, instance)",
"expr": "sum(increase(tikv_cdc_scan_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$tikv_instance\"}[1m])) by (type, instance)",
"format": "time_series",
"hide": false,
"intervalFactor": 1,
Expand Down Expand Up @@ -3900,7 +3900,7 @@
},
{
"datasource": "${DS_TEST-CLUSTER}",
"description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown",
"description": "The status of each changefeed.\n\n0: Normal\n\n1 and 6: Warning\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown",
"fieldConfig": {
"defaults": {
"color": {
Expand Down Expand Up @@ -3939,7 +3939,7 @@
{
"from": "",
"id": 2,
"text": "Error",
"text": "Warning",
"to": "",
"type": 1,
"value": "1"
Expand Down Expand Up @@ -3971,22 +3971,30 @@
{
"from": "",
"id": 6,
"text": "Warning",
"to": "",
"type": 1,
"value": "6"
},
{
"from": "",
"id": 7,
"text": "Unknown",
"to": "",
"type": 1,
"value": "-1"
},
{
"from": "5",
"id": 7,
"from": "7",
"id": 8,
"text": "Other",
"to": "10000",
"type": 1,
"value": "5"
"value": "7"
},
{
"from": "6",
"id": 8,
"from": "7",
"id": 9,
"text": "-",
"to": "1000",
"type": 2
Expand Down
4 changes: 2 additions & 2 deletions pkg/compression/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Encode(cc string, data []byte) ([]byte, error) {
default:
}

return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %d", cc)
return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %s", cc)
}

// Decode the given data by the given compression codec.
Expand All @@ -81,5 +81,5 @@ func Decode(cc string, data []byte) ([]byte, error) {
default:
}

return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %d", cc)
return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %s", cc)
}
21 changes: 13 additions & 8 deletions pkg/config/large_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,21 @@ func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig {
}
}

// Validate the Config.
func (c *LargeMessageHandleConfig) Validate(protocol Protocol, enableTiDBExtension bool) error {
// compression can be enabled independently
if c.LargeMessageHandleCompression != "" {
if !compression.Supported(c.LargeMessageHandleCompression) {
return cerror.ErrInvalidReplicaConfig.GenWithStack(
"large message handle compression is not supported, got %s", c.LargeMessageHandleCompression)
}
// AdjustAndValidate the Config.
func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTiDBExtension bool) error {
if c.LargeMessageHandleOption == "" {
c.LargeMessageHandleOption = LargeMessageHandleOptionNone
}

if c.LargeMessageHandleCompression == "" {
c.LargeMessageHandleCompression = compression.None
}

// compression can be enabled independently
if !compression.Supported(c.LargeMessageHandleCompression) {
return cerror.ErrInvalidReplicaConfig.GenWithStack(
"large message handle compression is not supported, got %s", c.LargeMessageHandleCompression)
}
if c.LargeMessageHandleOption == LargeMessageHandleOptionNone {
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/config/large_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ func TestLargeMessageHandle4Compression(t *testing.T) {
// unsupported compression, return error
largeMessageHandle.LargeMessageHandleCompression = "zstd"

err := largeMessageHandle.Validate(ProtocolCanalJSON, false)
err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)

largeMessageHandle.LargeMessageHandleCompression = compression.LZ4
err = largeMessageHandle.Validate(ProtocolCanalJSON, false)
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.NoError(t, err)

largeMessageHandle.LargeMessageHandleCompression = compression.Snappy
err = largeMessageHandle.Validate(ProtocolCanalJSON, false)
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.NoError(t, err)

largeMessageHandle.LargeMessageHandleCompression = compression.None
err = largeMessageHandle.Validate(ProtocolCanalJSON, false)
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.NoError(t, err)
}

Expand All @@ -50,11 +50,11 @@ func TestLargeMessageHandle4NotSupportedProtocol(t *testing.T) {

largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.Validate(ProtocolCanal, true)
err := largeMessageHandle.AdjustAndValidate(ProtocolCanal, true)
require.NoError(t, err)

largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly
err = largeMessageHandle.Validate(ProtocolCanal, true)
err = largeMessageHandle.AdjustAndValidate(ProtocolCanal, true)
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)
}

Expand All @@ -64,7 +64,7 @@ func TestLargeMessageHandle4CanalJSON(t *testing.T) {
// large-message-handle not set, always no error
largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.Validate(ProtocolCanalJSON, false)
err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

Expand All @@ -78,11 +78,11 @@ func TestLargeMessageHandle4CanalJSON(t *testing.T) {
}

// `enable-tidb-extension` is false, return error
err := largeMessageHandle.Validate(ProtocolCanalJSON, false)
err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false)
require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.Validate(ProtocolCanalJSON, true)
err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true)
require.NoError(t, err)
require.Equal(t, option, largeMessageHandle.LargeMessageHandleOption)
}
Expand All @@ -94,7 +94,7 @@ func TestLargeMessageHandle4OpenProtocol(t *testing.T) {
// large-message-handle not set, always no error
largeMessageHandle := NewDefaultLargeMessageHandleConfig()

err := largeMessageHandle.Validate(ProtocolOpen, false)
err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)
require.True(t, largeMessageHandle.Disabled())

Expand All @@ -108,11 +108,11 @@ func TestLargeMessageHandle4OpenProtocol(t *testing.T) {
}

// `enable-tidb-extension` is false, return error
err := largeMessageHandle.Validate(ProtocolOpen, false)
err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)

// `enable-tidb-extension` is true, no error
err = largeMessageHandle.Validate(ProtocolOpen, true)
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true)
require.NoError(t, err)
require.Equal(t, o, largeMessageHandle.LargeMessageHandleOption)

Expand Down
20 changes: 20 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/pingcap/tiflow/pkg/compression"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -325,3 +326,22 @@ func TestIsSinkCompatibleWithSpanReplication(t *testing.T) {
require.Equal(t, compatible, tt.compatible, tt.name)
}
}

func TestValidateAndAdjustLargeMessageHandle(t *testing.T) {
cfg := GetDefaultReplicaConfig()
cfg.Sink.KafkaConfig = &KafkaConfig{
LargeMessageHandle: NewDefaultLargeMessageHandleConfig(),
}
cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption = ""
cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleCompression = ""

rawURL := "kafka://127.0.0.1:9092/canal-json-test?protocol=canal-json&enable-tidb-extension=true"
sinkURL, err := url.Parse(rawURL)
require.NoError(t, err)

err = cfg.ValidateAndAdjust(sinkURL)
require.NoError(t, err)

require.Equal(t, LargeMessageHandleOptionNone, cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption)
require.Equal(t, compression.None, cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleCompression)
}
40 changes: 30 additions & 10 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"fmt"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -540,6 +541,33 @@ type CloudStorageConfig struct {
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
return err
}

if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) {
return nil
}

protocol, _ := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol))

if s.KafkaConfig != nil && s.KafkaConfig.LargeMessageHandle != nil {
var (
enableTiDBExtension bool
err error
)
if s := sinkURI.Query().Get("enable-tidb-extension"); s != "" {
enableTiDBExtension, err = strconv.ParseBool(s)
if err != nil {
return errors.Trace(err)
}
}
err = s.KafkaConfig.LargeMessageHandle.AdjustAndValidate(protocol, enableTiDBExtension)
if err != nil {
return err
}
}

if s.SchemaRegistry != nil &&
(s.KafkaConfig != nil && s.KafkaConfig.GlueSchemaRegistryConfig != nil) {
return cerror.ErrInvalidReplicaConfig.
Expand All @@ -548,21 +576,14 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
"schema-registry is used by confluent schema registry, " +
"glue-schema-registry-config is used by aws glue schema registry")
}

if s.KafkaConfig != nil && s.KafkaConfig.GlueSchemaRegistryConfig != nil {
err := s.KafkaConfig.GlueSchemaRegistryConfig.Validate()
if err != nil {
return err
}
}

if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
return err
}

if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) {
return nil
}

if s.PulsarConfig != nil {
if err := s.PulsarConfig.validate(); err != nil {
return err
Expand Down Expand Up @@ -595,7 +616,6 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
s.Terminator = util.AddressOf(CRLF)
}

protocol, _ := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol))
if util.GetOrZero(s.DeleteOnlyOutputHandleKeyColumns) && protocol == ProtocolCsv {
return cerror.ErrSinkInvalidConfig.GenWithStack(
"CSV protocol always output all columns for the delete event, " +
Expand Down Expand Up @@ -656,7 +676,7 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error {
return err
}

// Validate that protocol is compatible with the scheme. For testing purposes,
// Adjust that protocol is compatible with the scheme. For testing purposes,
// any protocol should be legal for blackhole.
if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) {
_, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs
c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod
}
if replicaConfig.Sink.KafkaConfig != nil {
if replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.LargeMessageHandle != nil {
c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle
}
if !c.LargeMessageHandle.Disabled() && replicaConfig.ForceReplicate {
Expand Down Expand Up @@ -324,7 +324,7 @@ func (c *Config) Validate() error {
}

if c.LargeMessageHandle != nil {
err := c.LargeMessageHandle.Validate(c.Protocol, c.EnableTiDBExtension)
err := c.LargeMessageHandle.AdjustAndValidate(c.Protocol, c.EnableTiDBExtension)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit bd44ce3

Please sign in to comment.