Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bhapas committed Oct 16, 2023
1 parent 89744e8 commit 579705f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ The emulator does not require authentication.
## Azure Event Hub Output Reference

The Azure Event Hub output is used to collect data from the azure event hub resource
When specifying a (`--azure-event-hub-connection-string`) , it should be retrieved as mentioned [here](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string).
When specifying a `--azure-event-hub-connection-string`, it should be retrieved as described [here](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string).

Sample config:

Expand All @@ -154,6 +154,6 @@ services:

### Options

- `azure-event-hub-connection-string`: The connection string to connect to the Event Hub
- `azure-event-hub-namespace`: The FullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net)
- `azure-event-hub-name`: The name of the Event hub
- `azure-event-hub-connection-string`: The connection string to connect to the Event Hub.
- `azure-event-hub-namespace`: The fully qualified domain name of the Event Hubs namespace. This it the Event Hubs namespace followed by `servicebus.windows.net` (e.g. myeventhub.servicebus.windows.net).
- `azure-event-hub-name`: The name of the Event hub.
4 changes: 2 additions & 2 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name")
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// AzureBlobStorage output flags.
// Azure BlobStorage output flags.
rootCmd.PersistentFlags().StringVar(&opts.AzureBlobStorageOptions.Container, "azure-blob-storage-container", "testcontainer", "Azure Blob Storage container name")
rootCmd.PersistentFlags().StringVar(&opts.AzureBlobStorageOptions.Blob, "azure-blob-storage-blob", "testblob", "Azure Blob Storage blob name")
rootCmd.PersistentFlags().StringVar(&opts.AzureBlobStorageOptions.Port, "azure-blob-storage-port", "10000", "HTTP port used to connect to the blob storage, used for emulators and CI")

// AzureEventHub output flags.
// Azure EventHub output flags.
rootCmd.PersistentFlags().StringVar(&opts.AzureEventHubOptions.FullyQualifiedNamespace, "azure-event-hub-namespace", "myeventhub.servicebus.windows.net", "Azure Eventhub namespace")
rootCmd.PersistentFlags().StringVar(&opts.AzureEventHubOptions.EventHubName, "azure-event-hub-name", "test-eventhub-seis", "Azure Eventhub name")
rootCmd.PersistentFlags().StringVar(&opts.AzureEventHubOptions.ConnectionString, "azure-event-hub-connection-string", "connectionstring", "Azure Eventhub connection string")
Expand Down
20 changes: 10 additions & 10 deletions pkg/output/azureeventhub/azure_event_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"context"
"fmt"

"github.com/elastic/stream/pkg/output"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"

"github.com/elastic/stream/pkg/output"
)

func init() {
Expand All @@ -31,20 +31,20 @@ func New(opts *output.Options) (output.Output, error) {
if opts.AzureEventHubOptions.ConnectionString != "" {
producerClient, err = azeventhubs.NewProducerClientFromConnectionString(opts.AzureEventHubOptions.ConnectionString, opts.AzureEventHubOptions.EventHubName, nil)
if err != nil {
return nil, fmt.Errorf("error while creating new eventhub producer client from connectionstring : %w", err)
return nil, fmt.Errorf("error while creating new eventhub producer client from connection string: %w", err)
}
} else {
fmt.Print("no connectionstring was provided, falling back to default credentials or environment variable")
fmt.Print("no connection string was provided, falling back to default credentials or environment variable")

// Credentials set as env variables - https://github.com/Azure/azure-sdk-for-go/blob/6b6f76ebe0d2334c83e8b6f89af4fe9d0b1ce631/sdk/azidentity/README.md?plain=1#L156-L187
// Credentials set as env variables - https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/azidentity#environment-variables
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, fmt.Errorf("missing azure credentials in the environment variables : %w", err)
return nil, fmt.Errorf("missing azure credentials in the environment variables: %w", err)
}

producerClient, err = azeventhubs.NewProducerClient(opts.AzureEventHubOptions.FullyQualifiedNamespace, opts.AzureEventHubOptions.EventHubName, defaultAzureCred, nil)
if err != nil {
return nil, fmt.Errorf("error while creating new eventhub producer client : %w", err)
return nil, fmt.Errorf("error while creating new eventhub producer client: %w", err)
}
}
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -64,16 +64,16 @@ func (o *Output) Close() error {
func (o *Output) Write(b []byte) (int, error) {
batch, err := o.producerClient.NewEventDataBatch(o.cancelCtx, nil)
if err != nil {
return 0, fmt.Errorf("error while creating new event data batch : %w", err)
return 0, fmt.Errorf("error while creating new event data batch: %w", err)
}
eventData := azeventhubs.EventData{Body: b}

if err := batch.AddEventData(&eventData, nil); err != nil {
return 0, fmt.Errorf("error while adding data to event data batch : %w", err)
return 0, fmt.Errorf("error while adding data to event data batch: %w", err)
}

if err := o.producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
return 0, fmt.Errorf("error while sending event data batch : %w", err)
return 0, fmt.Errorf("error while sending event data batch: %w", err)
}

return len(b), nil
Expand Down
1 change: 0 additions & 1 deletion pkg/output/lumberjack/lumberjack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/go-lumber/server"

Check failure on line 15 in pkg/output/lumberjack/lumberjack_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed with -local github.com/elastic/stream (goimports)

"github.com/elastic/stream/pkg/output"
)

Expand Down

0 comments on commit 579705f

Please sign in to comment.