Skip to content

Commit

Permalink
[Filebeat] Add pubsub_alternative_host to gcp pubsub input (elastic#2…
Browse files Browse the repository at this point in the history
…3215)

* Add pubsub_alternative_host to gcp pubsub input

* Apply suggestions

* Add changelog entry

* Add new option comment

* Make error more descriptive and reorder imports

(cherry picked from commit 545598f)
  • Loading branch information
marc-gr committed Dec 22, 2020
1 parent cb6d300 commit 7eabfc8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Misp improvements: Migration to httpjson v2 config, pagination and deduplication ID {pull}23070[23070]
- Add Google Workspace module and mark Gsuite module as deprecated {pull}22950[22950]
- Mark m365 defender, defender atp, okta and google workspace modules as GA {pull}23113[23113]
- Added `alternative_host` option to google pubsub input {pull}23215[23215]

*Heartbeat*

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/gcppubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type config struct {

// JSON blob containing authentication credentials and key.
CredentialsJSON []byte `config:"credentials_json"`

// Overrides the default Pub/Sub service address and disables TLS. For testing.
AlternativeHost string `config:"alternative_host"`
}

func (c *config) Validate() error {
Expand Down
33 changes: 24 additions & 9 deletions x-pack/filebeat/input/gcppubsub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"google.golang.org/api/option"
"google.golang.org/grpc"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
Expand Down Expand Up @@ -147,15 +149,7 @@ func (in *pubsubInput) run() error {
ctx, cancel := context.WithCancel(in.workerCtx)
defer cancel()

// Make pubsub client.
opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat", false))}
if in.CredentialsFile != "" {
opts = append(opts, option.WithCredentialsFile(in.CredentialsFile))
} else if len(in.CredentialsJSON) > 0 {
option.WithCredentialsJSON(in.CredentialsJSON)
}

client, err := pubsub.NewClient(ctx, in.ProjectID, opts...)
client, err := in.newPubsubClient(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -250,3 +244,24 @@ func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubs

return nil, errors.New("no subscription exists and 'subscription.create' is not enabled")
}

func (in *pubsubInput) newPubsubClient(ctx context.Context) (*pubsub.Client, error) {
opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat", false))}

if in.AlternativeHost != "" {
// this will be typically set because we want to point the input to a testing pubsub emulator
conn, err := grpc.Dial(in.AlternativeHost, grpc.WithInsecure())
if err != nil {
return nil, fmt.Errorf("cannot connect to alternative host %q: %w", in.AlternativeHost, err)
}
opts = append(opts, option.WithGRPCConn(conn), option.WithTelemetryDisabled())
}

if in.CredentialsFile != "" {
opts = append(opts, option.WithCredentialsFile(in.CredentialsFile))
} else if len(in.CredentialsJSON) > 0 {
opts = append(opts, option.WithCredentialsJSON(in.CredentialsJSON))
}

return pubsub.NewClient(ctx, in.ProjectID, opts...)
}

0 comments on commit 7eabfc8

Please sign in to comment.