Skip to content

Commit

Permalink
Fix intermittent test cases in cloud_pubsub (#5271)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Jan 9, 2019
1 parent 4b3580c commit e20ba1e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
13 changes: 7 additions & 6 deletions plugins/outputs/cloud_pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package cloud_pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"sync"

"cloud.google.com/go/pubsub"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"sync"
)

const sampleConfig = `
Expand All @@ -28,9 +29,9 @@ const sampleConfig = `
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"
## Optional. If true, will send all metrics per write in one PubSub message.
Expand All @@ -55,7 +56,7 @@ const sampleConfig = `
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"
## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/cloud_pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package cloud_pubsub

import (
"testing"

"cloud.google.com/go/pubsub"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"testing"
)

func TestPubSub_WriteSingle(t *testing.T) {
Expand Down
19 changes: 11 additions & 8 deletions plugins/outputs/cloud_pubsub/topic_stubbed.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package cloud_pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"errors"
"fmt"
"runtime"
"sync"
"testing"
"time"

"cloud.google.com/go/pubsub"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"google.golang.org/api/support/bundler"
"runtime"
"sync"
"testing"
)

const (
Expand Down Expand Up @@ -138,7 +140,7 @@ func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) {

func (t *stubTopic) initBundler() *stubTopic {
t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle())
t.bundler.DelayThreshold = t.Settings.DelayThreshold
t.bundler.DelayThreshold = 10 * time.Second
t.bundler.BundleCountThreshold = t.Settings.CountThreshold
if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount {
t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount
Expand All @@ -159,14 +161,15 @@ func (t *stubTopic) sendBundle() func(items interface{}) {

for _, msg := range bundled {
r := msg.stubResult
for _, id := range r.metricIds {
t.published[id] = msg.Message
}

if r.sendError {
r.err <- errors.New(errMockFail)
} else {
r.done <- struct{}{}
}
for _, id := range r.metricIds {
t.published[id] = msg.Message
}
}

t.bundleCount++
Expand Down

0 comments on commit e20ba1e

Please sign in to comment.