Skip to content

Commit

Permalink
fix(kafka): fix dynamic prop problem
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Nov 21, 2024
1 parent 56df9c5 commit 19705f0
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
6 changes: 3 additions & 3 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (k *KafkaSink) buildMsg(ctx api.StreamContext, item api.MessageTuple, decod
if len(k.kc.Key) > 0 {
newKey := k.kc.Key
if dp, ok := item.(api.HasDynamicProps); ok {
key, ok := dp.DynamicProps("key")
key, ok := dp.DynamicProps(k.kc.Key)
if ok {
newKey = key
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (k *KafkaSink) parseHeaders(ctx api.StreamContext, item api.MessageTuple) (
value := v
dp, ok := item.(api.HasDynamicProps)
if ok {
nv, ok := dp.DynamicProps(k)
nv, ok := dp.DynamicProps(v)
if ok {
value = nv
}
Expand All @@ -271,7 +271,7 @@ func (k *KafkaSink) parseHeaders(ctx api.StreamContext, item api.MessageTuple) (
raw := k.headerTemplate
dp, ok := item.(api.HasDynamicProps)
if ok {
nv, ok := dp.DynamicProps("headers")
nv, ok := dp.DynamicProps(k.headerTemplate)
if ok {
raw = nv
}
Expand Down
4 changes: 2 additions & 2 deletions extensions/impl/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestKafkaSinkBuildMsg(t *testing.T) {
d, _ := json.Marshal(item)
mockT := testx.MockTuple{
Map: item,
Template: map[string]string{"a": "1", "key": "1"},
Template: map[string]string{"{{.a}}": "1"},
}
msg, err := ks.buildMsg(ctx, mockT, d)
require.NoError(t, err)
Expand All @@ -141,7 +141,7 @@ func TestKafkaSinkBuildMsg(t *testing.T) {
}))
mockT = testx.MockTuple{
Map: item,
Template: map[string]string{"headers": "{\"a\":\"1\"}"},
Template: map[string]string{"{\"a\":\"{{.a}}\"}": "{\"a\":\"1\"}"},
}
msg, err = ks.buildMsg(ctx, mockT, d)
require.NoError(t, err)
Expand Down

0 comments on commit 19705f0

Please sign in to comment.