Skip to content

Commit

Permalink
fix(kafka): fix dynamic prop problem
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Nov 21, 2024
1 parent 3875ce3 commit 35e6c51
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (k *KafkaSink) buildMsg(ctx api.StreamContext, item api.MessageTuple, decod
if len(k.kc.Key) > 0 {
newKey := k.kc.Key
if dp, ok := item.(api.HasDynamicProps); ok {
key, ok := dp.DynamicProps("key")
key, ok := dp.DynamicProps(k.kc.Key)
if ok {
newKey = key
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (k *KafkaSink) parseHeaders(ctx api.StreamContext, item api.MessageTuple) (
value := v
dp, ok := item.(api.HasDynamicProps)
if ok {
nv, ok := dp.DynamicProps(k)
nv, ok := dp.DynamicProps(v)
if ok {
value = nv
}
Expand All @@ -267,7 +267,7 @@ func (k *KafkaSink) parseHeaders(ctx api.StreamContext, item api.MessageTuple) (
raw := k.headerTemplate
dp, ok := item.(api.HasDynamicProps)
if ok {
nv, ok := dp.DynamicProps("headers")
nv, ok := dp.DynamicProps(k.headerTemplate)
if ok {
raw = nv
}
Expand Down

0 comments on commit 35e6c51

Please sign in to comment.