Skip to content

Commit

Permalink
Add Base64-encoding/decoding for Google Cloud PubSub plugins (influxd…
Browse files Browse the repository at this point in the history
  • Loading branch information
emilymye authored and Mathieu Lecarme committed Apr 17, 2020
1 parent bf2d029 commit dd5f03e
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 9 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/cloud_pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ and creates metrics using one of the supported [input data formats][].
## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0

## Optional. If true, Telegraf will attempt to base64 decode the
## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false
```

### Multiple Subscriptions and Topics
Expand Down
20 changes: 19 additions & 1 deletion plugins/inputs/cloud_pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -40,6 +41,8 @@ type PubSub struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"`

Base64Data bool `toml:"base64_data"`

sub subscription
stubSub func() subscription

Expand Down Expand Up @@ -169,7 +172,18 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
}

metrics, err := ps.parser.Parse(msg.Data())
var data []byte
if ps.Base64Data {
strData, err := base64.StdEncoding.DecodeString(string(msg.Data()))
if err != nil {
return fmt.Errorf("unable to base64 decode message: %v", err)
}
data = []byte(strData)
} else {
data = msg.Data()
}

metrics, err := ps.parser.Parse(data)
if err != nil {
msg.Ack()
return err
Expand Down Expand Up @@ -345,4 +359,8 @@ const sampleConfig = `
## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0
## Optional. If true, Telegraf will attempt to base64 decode the
## PubSub message data before parsing
# base64_data = false
`
45 changes: 45 additions & 0 deletions plugins/inputs/cloud_pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloud_pubsub

import (
"encoding/base64"
"errors"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -55,6 +56,50 @@ func TestRunParse(t *testing.T) {
validateTestInfluxMetric(t, metric)
}

// Test ingesting InfluxDB-format PubSub message
func TestRunBase64(t *testing.T) {
subId := "sub-run-base64"

testParser, _ := parsers.NewInfluxParser()

sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)

ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true,
}

acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()

if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}

testTracker := &testTracker{}
msg := &testMsg{
value: base64.StdEncoding.EncodeToString([]byte(msgInflux)),
tracker: testTracker,
}
sub.messages <- msg

acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}

func TestRunInvalidMessages(t *testing.T) {
subId := "sub-invalid-messages"

Expand Down
3 changes: 3 additions & 0 deletions plugins/outputs/cloud_pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ generate it using `telegraf --usage cloud_pubsub`.
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"

## Optional. If true, published PubSub message data will be base64-encoded.
# base64_data = false

## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
Expand Down
17 changes: 17 additions & 0 deletions plugins/outputs/cloud_pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -56,6 +57,9 @@ const sampleConfig = `
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"
## Optional. If true, published PubSub message data will be base64-encoded.
# base64_data = false
## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
Expand All @@ -72,6 +76,7 @@ type PubSub struct {
PublishByteThreshold int `toml:"publish_byte_threshold"`
PublishNumGoroutines int `toml:"publish_num_go_routines"`
PublishTimeout internal.Duration `toml:"publish_timeout"`
Base64Data bool `toml:"base64_data"`

t topic
c *pubsub.Client
Expand Down Expand Up @@ -207,6 +212,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
if err != nil {
return nil, err
}

if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded)
}

msg := &pubsub.Message{Data: b}
if ps.Attributes != nil {
msg.Attributes = ps.Attributes
Expand All @@ -220,6 +231,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
if err != nil {
return nil, err
}

if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded)
}

msgs[i] = &pubsub.Message{
Data: b,
}
Expand Down
50 changes: 43 additions & 7 deletions plugins/outputs/cloud_pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
Expand All @@ -26,7 +27,7 @@ func TestPubSub_WriteSingle(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
}

Expand All @@ -48,7 +49,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) {
}

for _, testM := range testMetrics {
msg := verifyMetricPublished(t, testM.m, topic.published)
msg := verifyRawMetricPublished(t, testM.m, topic.published)
assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1")
assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2")
}
Expand All @@ -70,7 +71,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count")
}
Expand All @@ -94,7 +95,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
}
Expand All @@ -117,11 +118,33 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
verifyRawMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
}

func TestPubSub_WriteBase64Single(t *testing.T) {

testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error */},
{testutil.TestMetric("value_2", "test"), false},
}

settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
ps.Base64Data = true

err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}

for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */)
}
}

func TestPubSub_Error(t *testing.T) {
testMetrics := []testMetric{
// Force this batch to return error
Expand All @@ -141,7 +164,11 @@ func TestPubSub_Error(t *testing.T) {
}
}

func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
return verifyMetricPublished(t, m, published, false)
}

func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message {
p, _ := parsers.NewInfluxParser()

v, _ := m.GetField("value")
Expand All @@ -150,7 +177,16 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string
t.Fatalf("expected metric to get published (value: %s)", v.(string))
}

parsed, err := p.Parse(psMsg.Data)
data := psMsg.Data
if base64Encoded {
v, err := base64.StdEncoding.DecodeString(string(psMsg.Data))
if err != nil {
t.Fatalf("Unable to decode expected base64-encoded message: %s", err)
}
data = []byte(v)
}

parsed, err := p.Parse(data)
if err != nil {
t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data))
}
Expand Down
12 changes: 11 additions & 1 deletion plugins/outputs/cloud_pubsub/topic_stubbed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -180,8 +181,17 @@ func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p, _ := parsers.NewInfluxParser()
metrics, err := p.Parse(msg.Data)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
// Just attempt to base64-decode first before returning error.
d, err := base64.StdEncoding.DecodeString(string(msg.Data))
if err != nil {
t.Errorf("unable to base64-decode potential test message: %v", err)
}
metrics, err = p.Parse(d)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
}

ids := make([]string, len(metrics))
for i, met := range metrics {
id, _ := met.GetField("value")
Expand Down

0 comments on commit dd5f03e

Please sign in to comment.