Skip to content

Commit

Permalink
pkg/sink(ticdc): iterate all Kafka configs to support KOP (#8893)
Browse files Browse the repository at this point in the history
close #8892
  • Loading branch information
Rustin170506 authored May 7, 2023
1 parent 58b465a commit c9d276f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
21 changes: 13 additions & 8 deletions pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,21 @@ func (a *saramaAdminClient) GetBrokerConfig(
return "", err
}

if len(configEntries) == 0 || configEntries[0].Name != configName {
log.Warn("Kafka config item not found",
zap.String("namespace", a.changefeed.Namespace),
zap.String("changefeed", a.changefeed.ID),
zap.String("configName", configName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range configEntries {
if entry.Name == configName {
return entry.Value, nil
}
}

return configEntries[0].Value, nil
log.Warn("Kafka config item not found",
zap.String("namespace", a.changefeed.Namespace),
zap.String("changefeed", a.changefeed.ID),
zap.String("configName", configName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}

func (a *saramaAdminClient) GetTopicsPartitions(_ context.Context) (map[string]int32, error) {
Expand Down
20 changes: 12 additions & 8 deletions pkg/sink/kafka/v2/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,25 @@ func (a *admin) GetBrokerConfig(ctx context.Context, configName string) (string,
}

if len(resp.Resources) == 0 || len(resp.Resources[0].ConfigEntries) == 0 {
log.Warn("kafka config item not found",
log.Warn("Kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}

entry := resp.Resources[0].ConfigEntries[0]
if entry.ConfigName != configName {
log.Warn("kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range resp.Resources[0].ConfigEntries {
if entry.ConfigName == configName {
return entry.ConfigValue, nil
}
}

return entry.ConfigValue, nil
log.Warn("Kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}

func (a *admin) GetTopicsPartitions(ctx context.Context) (map[string]int32, error) {
Expand Down

0 comments on commit c9d276f

Please sign in to comment.