Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config(ticdc): Add alias for "dispatcher" in dispatch rules #5441

Merged
merged 12 commits into from
May 17, 2022
4 changes: 2 additions & 2 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {
}
for _, dispatch := range v1.Sink.DispatchRules {
c.Sink.DispatchRules = append(c.Sink.DispatchRules, &DispatchRule{
Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)},
PartitionRule: dispatch.Rule,
Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)},
DispatcherRule: dispatch.Rule,
})
}
}
Expand Down
26 changes: 23 additions & 3 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func TestReplicaConfigOutDated(t *testing.T) {
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = "open-protocol"
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, PartitionRule: "r1"},
{Matcher: []string{"a.c"}, PartitionRule: "r2"},
{Matcher: []string{"a.d"}, PartitionRule: "r2"},
{Matcher: []string{"a.b"}, DispatcherRule: "r1"},
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
{Matcher: []string{"a.d"}, DispatcherRule: "r2"},
}
require.Equal(t, conf, conf2)
}
Expand All @@ -94,4 +94,24 @@ func TestReplicaConfigValidate(t *testing.T) {
conf.Sink.Protocol = "canal"
conf.EnableOldValue = false
require.Regexp(t, ".*canal protocol requires old value to be enabled.*", conf.Validate())

conf = GetDefaultReplicaConfig()
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"},
}
require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", conf.Validate())

// Correct sink configuration.
conf = GetDefaultReplicaConfig()
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "d1"},
{Matcher: []string{"a.c"}, PartitionRule: "p1"},
{Matcher: []string{"a.d"}},
}
err := conf.Validate()
require.Nil(t, err)
rules := conf.Sink.DispatchRules
require.Equal(t, "d1", rules[0].PartitionRule)
require.Equal(t, "p1", rules[1].PartitionRule)
require.Equal(t, "", rules[2].PartitionRule)
}
18 changes: 15 additions & 3 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// DefaultMaxMessageBytes sets the default value for max-message-bytes
Expand All @@ -40,9 +41,10 @@ type SinkConfig struct {

// DispatchRule represents partition rule for a table
type DispatchRule struct {
Matcher []string `toml:"matcher" json:"matcher"`
PartitionRule string `toml:"dispatcher" json:"dispatcher"`
TopicRule string `toml:"topic" json:"topic"`
Matcher []string `toml:"matcher" json:"matcher"`
DispatcherRule string `toml:"dispatcher" json:"dispatcher"`
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
PartitionRule string `toml:"partition" json:"partition"`
TopicRule string `toml:"topic" json:"topic"`
}

// ColumnSelector represents a column selector for a table.
Expand All @@ -62,6 +64,16 @@ func (s *SinkConfig) validate(enableOldValue bool) error {
}
}
}
for _, rule := range s.DispatchRules {
if rule.DispatcherRule != "" && rule.PartitionRule != "" {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
log.Error("dispatcher and partition cannot be configured both", zap.Any("rule", rule))
return cerror.WrapError(cerror.ErrSinkInvalidConfig,
errors.New(fmt.Sprintf("dispatcher and partition cannot be configured both for rule:%v", rule)))
}
if rule.DispatcherRule != "" {
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
rule.PartitionRule = rule.DispatcherRule
}
}

return nil
}