Skip to content

Commit

Permalink
minor fixes and formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
moshebe committed Apr 22, 2021
1 parent 5c69807 commit f4b17f0
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ builds:
id: dkron-executor-kafka
binary: dkron-executor-kafka

- <<: *xbuild
main: ./builtin/bins/dkron-executor-gcppubsub/
id: dkron-executor-gcppubsub
binary: dkron-executor-gcppubsub

- <<: *xbuild
main: ./builtin/bins/dkron-processor-files/
id: dkron-processor-files
Expand Down
34 changes: 7 additions & 27 deletions builtin/bins/dkron-executor-gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type GCPPubSub struct {
// Execute Process method of the plugin
// "executor": "gcppubsub",
// "executor_config": {
// "project": "project-id",
// "topic": "topic-name",
// "data": "aGVsbG8gd29ybGQ=" // Optional. base64 encoded data
// "attributes": "{\"hello\":\"world\",\"waka\":\"paka\"}" // JSON serialized attributes
// "project": "project-id",
// "topic": "topic-name",
// "data": "aGVsbG8gd29ybGQ=" // Optional. base64 encoded data
// "attributes": "{\"hello\":\"world\",\"waka\":\"paka\"}" // JSON serialized attributes
// }
func (g *GCPPubSub) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) {
out, err := g.ExecuteImpl(args)
Expand All @@ -37,8 +37,6 @@ func (g *GCPPubSub) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) {
ctx := context.Background()
projectID := args.Config["project"]
topicName := args.Config["topic"]
encodedData := args.Config["data"]
attributesJSON := args.Config["attributes"]

if projectID == "" {
return nil, fmt.Errorf("missing project")
Expand All @@ -48,27 +46,9 @@ func (g *GCPPubSub) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) {
return nil, fmt.Errorf("missing topic")
}

if attributesJSON == "" && encodedData == "" {
return nil, fmt.Errorf("at least one of these fields should be set 'attributes, data'")
}

msg := &pubsub.Message{}

var attributes map[string]string
if attributesJSON != "" {
err := json.Unmarshal([]byte(attributesJSON), &attributes)
if err != nil {
return nil, fmt.Errorf("invalid attributes JSON: %w", err)
}
msg.Attributes = attributes
}

if encodedData != "" {
data, err := base64.StdEncoding.DecodeString(encodedData)
if err != nil {
return nil, fmt.Errorf("invalid encoded data: %w", err)
}
msg.Data = data
msg, err := ConfigToPubSubMessage(args.Config)
if err != nil{
return nil, fmt.Errorf("convert config to pubsub message: %w", err)
}

client, err := pubsub.NewClient(ctx, projectID)
Expand Down
3 changes: 2 additions & 1 deletion builtin/bins/dkron-executor-gcppubsub/gcppubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main

import (
"testing"

"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"testing"
)

func TestConfigToPubSubMessage(t *testing.T) {
Expand Down

0 comments on commit f4b17f0

Please sign in to comment.