Skip to content

Commit

Permalink
config(ticdc): Add alias for "dispatcher" in dispatch rules (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu authored and sdojjy committed May 18, 2022
1 parent ec7d396 commit 2c40521
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 84 deletions.
4 changes: 2 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func TestFillV1(t *testing.T) {
},
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test.tbl3"}, PartitionRule: "ts"},
{Matcher: []string{"test.tbl4"}, PartitionRule: "rowid"},
{Matcher: []string{"test.tbl3"}, DispatcherRule: "ts"},
{Matcher: []string{"test.tbl4"}, DispatcherRule: "rowid"},
},
},
Cyclic: &config.CyclicConfig{
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/mysql/txn_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func (c *unresolvedTxnCache) Resolved(

func splitResolvedTxn(
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (checkpointTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
) (checkpointTsMap map[model.TableID]uint64,
resolvedRowsMap map[model.TableID][]*model.SingleTableTxn,
) {
var (
ok bool
txnsLength int
Expand Down
5 changes: 5 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ var doc = `{
"type": "object",
"properties": {
"dispatcher": {
"description": "Deprecated, please use PartitionRule.",
"type": "string"
},
"matcher": {
Expand All @@ -731,6 +732,10 @@ var doc = `{
"type": "string"
}
},
"partition": {
"description": "PartitionRule is an alias added for DispatcherRule to mitigate confusions.\nIn the future release, the DispatcherRule is expected to be removed .",
"type": "string"
},
"topic": {
"type": "string"
}
Expand Down
5 changes: 5 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@
"type": "object",
"properties": {
"dispatcher": {
"description": "Deprecated, please use PartitionRule.",
"type": "string"
},
"matcher": {
Expand All @@ -712,6 +713,10 @@
"type": "string"
}
},
"partition": {
"description": "PartitionRule is an alias added for DispatcherRule to mitigate confusions.\nIn the future release, the DispatcherRule is expected to be removed .",
"type": "string"
},
"topic": {
"type": "string"
}
Expand Down
6 changes: 6 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ definitions:
config.DispatchRule:
properties:
dispatcher:
description: Deprecated, please use PartitionRule.
type: string
matcher:
items:
type: string
type: array
partition:
description: |-
PartitionRule is an alias added for DispatcherRule to mitigate confusions.
In the future release, the DispatcherRule is expected to be removed .
type: string
topic:
type: string
type: object
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/util/changefeed.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ worker-num = 16
# For MQ Sinks, you can configure event distribution rules through dispatchers
# Dispatchers support default, ts, rowid and table
dispatchers = [
{ matcher = ['test1.*', 'test2.*'], dispatcher = "ts", topic = "hello_{schema}" },
{ matcher = ['test1.*', 'test2.*'], partition = "ts", topic = "hello_{schema}" },
{ matcher = ['test3.*', 'test4.*'], dispatcher = "rowid", topic = "{schema}_world" },
]
# 对于 MQ 类的 Sink,可以通过 column-selectors 配置 column 选择器
Expand Down
151 changes: 80 additions & 71 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,17 @@ package util
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
)

func TestSuite(t *testing.T) { check.TestingT(t) }

type utilsSuite struct{}

var _ = check.Suite(&utilsSuite{})

func (s *utilsSuite) TestProxyFields(c *check.C) {
defer testleak.AfterTest(c)()
func TestProxyFields(t *testing.T) {
revIndex := map[string]int{
"http_proxy": 0,
"https_proxy": 1,
Expand All @@ -46,65 +39,74 @@ func (s *utilsSuite) TestProxyFields(c *check.C) {
// Each bit of the mask decided whether this index of `envs` would be set.
for mask := 0; mask <= 0b111; mask++ {
for _, env := range envs {
c.Assert(os.Unsetenv(env), check.IsNil)
require.Nil(t, os.Unsetenv(env))
}

for i := 0; i < 3; i++ {
if (1<<i)&mask != 0 {
c.Assert(os.Setenv(envs[i], envPreset[i]), check.IsNil)
require.Nil(t, os.Setenv(envs[i], envPreset[i]))
}
}

for _, field := range findProxyFields() {
idx, ok := revIndex[field.Key]
c.Assert(ok, check.IsTrue)
c.Assert((1<<idx)&mask, check.Not(check.Equals), 0)
c.Assert(field.String, check.Equals, envPreset[idx])
require.True(t, ok)
require.NotEqual(t, 0, (1<<idx)&mask)
require.Equal(t, field.String, envPreset[idx])
}
}
}

func (s *utilsSuite) TestVerifyPdEndpoint(c *check.C) {
defer testleak.AfterTest(c)()
func TestVerifyPdEndpoint(t *testing.T) {
// empty URL.
url := ""
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// invalid URL.
url = "\n hi"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*invalid control character in URL.*")
require.Regexp(t, ".*invalid control character in URL.*",
VerifyPdEndpoint(url, false))

// http URL without host.
url = "http://"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// https URL without host.
url = "https://"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// postgres scheme.
url = "postgres://postgres@localhost/cargo_registry"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// https scheme without TLS.
url = "https://aa"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint scheme is https, please provide certificate.*")
require.Regexp(t, ".*PD endpoint scheme is https, please provide certificate.*",
VerifyPdEndpoint(url, false))

// http scheme with TLS.
url = "http://aa"
c.Assert(VerifyPdEndpoint(url, true), check.ErrorMatches, ".*PD endpoint scheme should be https.*")
require.Regexp(t, ".*PD endpoint scheme should be https.*", VerifyPdEndpoint(url, true))

// valid http URL.
c.Assert(VerifyPdEndpoint("http://aa", false), check.IsNil)
require.Nil(t, VerifyPdEndpoint("http://aa", false))

// valid https URL with TLS.
c.Assert(VerifyPdEndpoint("https://aa", true), check.IsNil)
require.Nil(t, VerifyPdEndpoint("https://aa", true))
}

func (s *utilsSuite) TestStrictDecodeValidFile(c *check.C) {
defer testleak.AfterTest(c)()
dataDir := c.MkDir()
tmpDir := c.MkDir()
func TestStrictDecodeValidFile(t *testing.T) {
dataDir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", "tmp")
require.NoError(t, err)
defer os.RemoveAll(dataDir)
defer os.RemoveAll(tmpDir)

configPath := filepath.Join(tmpDir, "ticdc.toml")
configContent := fmt.Sprintf(`
addr = "128.0.0.1:1234"
Expand Down Expand Up @@ -140,18 +142,22 @@ cert-path = "bb"
key-path = "cc"
cert-allowed-cn = ["dd","ee"]
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
err = StrictDecodeFile(configPath, "test", conf)
c.Assert(err, check.IsNil)
require.Nil(t, err)
}

func (s *utilsSuite) TestStrictDecodeInvalidFile(c *check.C) {
defer testleak.AfterTest(c)()
dataDir := c.MkDir()
tmpDir := c.MkDir()
func TestStrictDecodeInvalidFile(t *testing.T) {
dataDir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", "tmp")
require.NoError(t, err)
defer os.RemoveAll(dataDir)
defer os.RemoveAll(tmpDir)

configPath := filepath.Join(tmpDir, "ticdc.toml")
configContent := fmt.Sprintf(`
unknown = "128.0.0.1:1234"
Expand All @@ -162,29 +168,30 @@ max-size = 200
max-days = 1
max-backups = 1
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
err = StrictDecodeFile(configPath, "test", conf)
c.Assert(err, check.ErrorMatches, ".*contained unknown configuration options.*")
require.Contains(t, err.Error(), "contained unknown configuration options")
}

func (s *utilsSuite) TestAndWriteExampleReplicaTOML(c *check.C) {
defer testleak.AfterTest(c)()
func TestAndWriteExampleReplicaTOML(t *testing.T) {
cfg := config.GetDefaultReplicaConfig()
err := StrictDecodeFile("changefeed.toml", "cdc", &cfg)
c.Assert(err, check.IsNil)
require.Nil(t, err)

c.Assert(cfg.CaseSensitive, check.IsTrue)
c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{
require.True(t, cfg.CaseSensitive)
require.Equal(t, &config.FilterConfig{
IgnoreTxnStartTs: []uint64{1, 2},
Rules: []string{"*.*", "!test.*"},
})
c.Assert(cfg.Mounter, check.DeepEquals, &config.MounterConfig{
}, cfg.Filter)
require.Equal(t, &config.MounterConfig{
WorkerNum: 16,
})
c.Assert(cfg.Sink, check.DeepEquals, &config.SinkConfig{
}, cfg.Mounter)
err = cfg.Validate()
require.Nil(t, err)
require.Equal(t, &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{PartitionRule: "ts", TopicRule: "hello_{schema}", Matcher: []string{"test1.*", "test2.*"}},
{PartitionRule: "rowid", TopicRule: "{schema}_world", Matcher: []string{"test3.*", "test4.*"}},
Expand All @@ -194,28 +201,26 @@ func (s *utilsSuite) TestAndWriteExampleReplicaTOML(c *check.C) {
{Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}},
},
Protocol: "open-protocol",
})
c.Assert(cfg.Cyclic, check.DeepEquals, &config.CyclicConfig{
}, cfg.Sink)
require.Equal(t, &config.CyclicConfig{
Enable: false,
ReplicaID: 1,
FilterReplicaID: []uint64{2, 3},
SyncDDL: true,
})
}, cfg.Cyclic)
}

func (s *utilsSuite) TestAndWriteExampleServerTOML(c *check.C) {
defer testleak.AfterTest(c)()
func TestAndWriteExampleServerTOML(t *testing.T) {
cfg := config.GetDefaultServerConfig()
err := StrictDecodeFile("ticdc.toml", "cdc", &cfg)
c.Assert(err, check.IsNil)
require.Nil(t, err)
defcfg := config.GetDefaultServerConfig()
defcfg.AdvertiseAddr = "127.0.0.1:8300"
defcfg.LogFile = "/tmp/ticdc/ticdc.log"
c.Assert(cfg, check.DeepEquals, defcfg)
require.Equal(t, defcfg, cfg)
}

func (s *utilsSuite) TestJSONPrint(c *check.C) {
defer testleak.AfterTest(c)()
func TestJSONPrint(t *testing.T) {
cmd := new(cobra.Command)
type testStruct struct {
A string `json:"a"`
Expand All @@ -229,19 +234,23 @@ func (s *utilsSuite) TestJSONPrint(c *check.C) {
cmd.SetOut(&b)

err := JSONPrint(cmd, &data)
c.Assert(err, check.IsNil)
require.Nil(t, err)

output := `{
"a": "string"
}
`
c.Assert(b.String(), check.Equals, output)
require.Equal(t, output, b.String())
}

func (s *utilsSuite) TestIgnoreStrictCheckItem(c *check.C) {
defer testleak.AfterTest(c)()
dataDir := c.MkDir()
tmpDir := c.MkDir()
func TestIgnoreStrictCheckItem(t *testing.T) {
dataDir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", "tmp")
require.NoError(t, err)
defer os.RemoveAll(dataDir)
defer os.RemoveAll(tmpDir)

configPath := filepath.Join(tmpDir, "ticdc.toml")
configContent := fmt.Sprintf(`
data-dir = "%+v"
Expand All @@ -250,12 +259,12 @@ max-size = 200
max-days = 1
max-backups = 1
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
err = StrictDecodeFile(configPath, "test", conf, "unknown")
c.Assert(err, check.IsNil)
require.Nil(t, err)

configContent = fmt.Sprintf(`
data-dir = "%+v"
Expand All @@ -269,19 +278,19 @@ max-days = 1
max-backups = 1
`, dataDir)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
require.Nil(t, err)

err = StrictDecodeFile(configPath, "test", conf, "unknown")
c.Assert(err, check.ErrorMatches, ".*contained unknown configuration options: unknown2.*")
require.Contains(t, err.Error(), "contained unknown configuration options: unknown2")

configContent = fmt.Sprintf(`
data-dir = "%+v"
[debug]
unknown = 1
`, dataDir)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
require.Nil(t, err)

err = StrictDecodeFile(configPath, "test", conf, "debug")
c.Assert(err, check.IsNil)
require.Nil(t, err)
}
Loading

0 comments on commit 2c40521

Please sign in to comment.