Skip to content

Commit

Permalink
feat(mqtt): v5 sink supports properties
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Nov 21, 2024
1 parent 56df9c5 commit 2bdf43f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
25 changes: 19 additions & 6 deletions internal/io/mqtt/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (

// AdConf is the advanced configuration for the mqtt sink
type AdConf struct {
Tpc string `json:"topic"`
Qos byte `json:"qos"`
Retained bool `json:"retained"`
SelId string `json:"connectionSelector"`
Tpc string `json:"topic"`
Qos byte `json:"qos"`
Retained bool `json:"retained"`
SelId string `json:"connectionSelector"`
Props map[string]string `json:"properties"`
PVersion string `json:"protocolVersion"`
}

type Sink struct {
Expand All @@ -42,7 +44,7 @@ type Sink struct {
cli *Connection
}

func (ms *Sink) Provision(_ api.StreamContext, ps map[string]any) error {
func (ms *Sink) Provision(ctx api.StreamContext, ps map[string]any) error {
err := ValidateConfig(ps)
if err != nil {
return err
Expand All @@ -63,6 +65,9 @@ func (ms *Sink) Provision(_ api.StreamContext, ps map[string]any) error {
}
ms.config = ps
ms.adconf = adconf
if adconf.PVersion != "5" && adconf.Props != nil {
ctx.GetLogger().Warnf("Only mqtt v5 supports properties, ignore the properties setting")
}
return nil
}

Expand Down Expand Up @@ -96,15 +101,23 @@ func validateMQTTSinkTopic(topic string) error {

func (ms *Sink) Collect(ctx api.StreamContext, item api.RawTuple) error {
tpc := ms.adconf.Tpc
props := ms.adconf.Props
// If tpc supports dynamic props(template), planner will guarantee the result has the parsed dynamic props
if dp, ok := item.(api.HasDynamicProps); ok {
temp, transformed := dp.DynamicProps(tpc)
if transformed {
tpc = temp
}
for k, v := range props {
nv, ok := dp.DynamicProps(v)
if ok {
props[k] = nv
}
}
}

ctx.GetLogger().Debugf("publishing to topic %s", tpc)
return ms.cli.Publish(ctx, tpc, ms.adconf.Qos, ms.adconf.Retained, item.Raw(), nil)
return ms.cli.Publish(ctx, tpc, ms.adconf.Qos, ms.adconf.Retained, item.Raw(), props)
}

func (ms *Sink) Close(ctx api.StreamContext) error {
Expand Down
3 changes: 3 additions & 0 deletions internal/io/mqtt/source_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func TestSourceSinkRecon(t *testing.T) {
"topic": "demo",
"qos": 0,
"retained": false,
"properties": map[string]string{
"invalid": "v3",
},
})
assert.NoError(t, err)
err = server.Close()
Expand Down
3 changes: 3 additions & 0 deletions internal/io/mqtt/v5_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func TestV5SourceSinkRecon(t *testing.T) {
"qos": 0,
"protocolVersion": "5",
"retained": false,
"properties": map[string]string{
"prop2": "val2",
},
})
assert.NoError(t, err)
})
Expand Down

0 comments on commit 2bdf43f

Please sign in to comment.