Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base64-encoding/decoding parameters for Google Cloud PubSub plugins #5543

Merged
merged 3 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/cloud_pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ and creates metrics using one of the supported [input data formats][].
## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0

## Optional. If true, Telegraf will attempt to base64 decode the
## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false
```

### Multiple Subscriptions and Topics
Expand Down
20 changes: 19 additions & 1 deletion plugins/inputs/cloud_pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -40,6 +41,8 @@ type PubSub struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"`

Base64Data bool `toml:"base64_data"`

sub subscription
stubSub func() subscription

Expand Down Expand Up @@ -169,7 +172,18 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
}

metrics, err := ps.parser.Parse(msg.Data())
var data []byte
if ps.Base64Data {
strData, err := base64.StdEncoding.DecodeString(string(msg.Data()))
if err != nil {
return fmt.Errorf("unable to base64 decode message: %v", err)
}
data = []byte(strData)
} else {
data = msg.Data()
}

metrics, err := ps.parser.Parse(data)
if err != nil {
msg.Ack()
return err
Expand Down Expand Up @@ -345,4 +359,8 @@ const sampleConfig = `
## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0

## Optional. If true, Telegraf will attempt to base64 decode the
## PubSub message data before parsing
# base64_data = false
`
45 changes: 45 additions & 0 deletions plugins/inputs/cloud_pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloud_pubsub

import (
"encoding/base64"
"errors"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -55,6 +56,50 @@ func TestRunParse(t *testing.T) {
validateTestInfluxMetric(t, metric)
}

// Test ingesting InfluxDB-format PubSub message
func TestRunBase64(t *testing.T) {
subId := "sub-run-base64"

testParser, _ := parsers.NewInfluxParser()

sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)

ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true,
}

acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()

if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}

testTracker := &testTracker{}
msg := &testMsg{
value: base64.StdEncoding.EncodeToString([]byte(msgInflux)),
tracker: testTracker,
}
sub.messages <- msg

acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}

func TestRunInvalidMessages(t *testing.T) {
subId := "sub-invalid-messages"

Expand Down
3 changes: 3 additions & 0 deletions plugins/outputs/cloud_pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ generate it using `telegraf --usage cloud_pubsub`.
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"

## Optional. If true, published PubSub message data will be base64-encoded.
# base64_data = false

## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
Expand Down
17 changes: 17 additions & 0 deletions plugins/outputs/cloud_pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -56,6 +57,9 @@ const sampleConfig = `
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"

## Optional. If true, published PubSub message data will be base64-encoded.
# base64_data = false

## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
Expand All @@ -72,6 +76,7 @@ type PubSub struct {
PublishByteThreshold int `toml:"publish_byte_threshold"`
PublishNumGoroutines int `toml:"publish_num_go_routines"`
PublishTimeout internal.Duration `toml:"publish_timeout"`
Base64Data bool `toml:"base64_data"`

t topic
c *pubsub.Client
Expand Down Expand Up @@ -207,6 +212,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
if err != nil {
return nil, err
}

if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded)
}

msg := &pubsub.Message{Data: b}
if ps.Attributes != nil {
msg.Attributes = ps.Attributes
Expand All @@ -220,6 +231,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
if err != nil {
return nil, err
}

if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded)
}

msgs[i] = &pubsub.Message{
Data: b,
}
Expand Down
50 changes: 43 additions & 7 deletions plugins/outputs/cloud_pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
Expand All @@ -26,7 +27,7 @@ func TestPubSub_WriteSingle(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
}

Expand All @@ -48,7 +49,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) {
}

for _, testM := range testMetrics {
msg := verifyMetricPublished(t, testM.m, topic.published)
msg := verifyRawMetricPublished(t, testM.m, topic.published)
assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1")
assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2")
}
Expand All @@ -70,7 +71,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count")
}
Expand All @@ -94,7 +95,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
}
Expand All @@ -117,11 +118,33 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
}

func TestPubSub_WriteBase64Single(t *testing.T) {

testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error */},
{testutil.TestMetric("value_2", "test"), false},
}

settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
ps.Base64Data = true

err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */)
}
}

func TestPubSub_Error(t *testing.T) {
testMetrics := []testMetric{
// Force this batch to return error
Expand All @@ -141,7 +164,11 @@ func TestPubSub_Error(t *testing.T) {
}
}

func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
return verifyMetricPublished(t, m, published, false)
}

func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message {
p, _ := parsers.NewInfluxParser()

v, _ := m.GetField("value")
Expand All @@ -150,7 +177,16 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string
t.Fatalf("expected metric to get published (value: %s)", v.(string))
}

parsed, err := p.Parse(psMsg.Data)
data := psMsg.Data
if base64Encoded {
v, err := base64.StdEncoding.DecodeString(string(psMsg.Data))
if err != nil {
t.Fatalf("Unable to decode expected base64-encoded message: %s", err)
}
data = []byte(v)
}

parsed, err := p.Parse(data)
if err != nil {
t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data))
}
Expand Down
12 changes: 11 additions & 1 deletion plugins/outputs/cloud_pubsub/topic_stubbed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -180,8 +181,17 @@ func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p, _ := parsers.NewInfluxParser()
metrics, err := p.Parse(msg.Data)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
// Just attempt to base64-decode first before returning error.
d, err := base64.StdEncoding.DecodeString(string(msg.Data))
if err != nil {
t.Errorf("unable to base64-decode potential test message: %v", err)
}
metrics, err = p.Parse(d)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
}

ids := make([]string, len(metrics))
for i, met := range metrics {
id, _ := met.GetField("value")
Expand Down