diff --git a/example.js b/example.js index 2a3df14..543245b 100644 --- a/example.js +++ b/example.js @@ -1,4 +1,4 @@ -import {check} from 'k6'; +import { check } from 'k6'; import pubsub from 'k6/x/pubsub'; export default function () { @@ -15,7 +15,7 @@ export default function () { trace: true, doNotCreateTopicIfMissing: false }); - let error = pubsub.publish(client, 'topic_name', 'message data'); + let error = pubsub.publish(client, 'topic_name', 'message data', {key:"value",key2:"value2"}); check(error, { "is sent": err => err === null diff --git a/go.mod b/go.mod index e3c2933..2b462a1 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,5 @@ require ( github.com/ThreeDotsLabs/watermill-googlecloud v1.0.6 github.com/mitchellh/mapstructure v1.1.2 go.k6.io/k6 v0.32.0 + google.golang.org/api v0.30.0 ) diff --git a/pubsub.go b/pubsub.go index 7115bb1..faa5887 100644 --- a/pubsub.go +++ b/pubsub.go @@ -76,7 +76,7 @@ func (ps *PubSub) Publisher(config map[string]interface{}) *googlecloud.Publishe // Publish publishes a message to the provided topic using provided // googlecloud.Publisher. The msg value must be passed as string // and will be converted to bytes sequence before publishing. -func (ps *PubSub) Publish(ctx context.Context, p *googlecloud.Publisher, topic, msg string) error { +func (ps *PubSub) Publish(ctx context.Context, p *googlecloud.Publisher, topic, msg string, attributes map[string]string) error { state := lib.GetState(ctx) if state == nil { @@ -85,9 +85,11 @@ func (ps *PubSub) Publish(ctx context.Context, p *googlecloud.Publisher, topic, return err } + m := message.NewMessage(watermill.NewShortUUID(), []byte(msg)) + m.Metadata = attributes err := p.Publish( topic, - message.NewMessage(watermill.NewShortUUID(), []byte(msg)), + m, ) if err != nil {