Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

language agnostic checkpointing for azure eventhub scaler #1621

Merged
merged 7 commits into from
May 5, 2021
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Fixed goroutine leaks in usage of timers ([#1704](https://github.com/kedacore/keda/pull/1704) | [#1739](https://github.com/kedacore/keda/pull/1739))
- Setting timeouts in the HTTP client used by the IBM MQ scaler ([#1758](https://github.com/kedacore/keda/pull/1758))
- Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768))
- Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621))

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/go-logr/logr v0.4.0
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-openapi/spec v0.20.3
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.5.0
Expand Down
119 changes: 18 additions & 101 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,15 @@
package azure

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"

"github.com/imdario/mergo"

"github.com/Azure/azure-amqp-common-go/v3/aad"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/util"
)

type baseCheckpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
}

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk and Java sdk
type Checkpoint struct {
baseCheckpoint
PartitionID string `json:"PartitionId"`
SequenceNumber int64 `json:"SequenceNumber"`
}

// Eventhub python sdk stores the checkpoint differently
type pythonCheckpoint struct {
baseCheckpoint
PartitionID string `json:"partition_id"`
SequenceNumber int64 `json:"sequence_number"`
}

// EventHubInfo to keep event hub connection and resources
type EventHubInfo struct {
EventHubConnection string
Expand All @@ -51,6 +18,7 @@ type EventHubInfo struct {
BlobContainer string
Namespace string
EventHubName string
CheckpointStrategy string
}

// GetEventHubClient returns eventhub client
Expand Down Expand Up @@ -80,74 +48,6 @@ func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) {
return nil, aadErr
}

// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "")
if err != nil {
return Checkpoint{}, err
}

var eventHubNamespace string
var eventHubName string
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return Checkpoint{}, err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
var baseURL *url.URL
// Checking blob store for C# and Java applications
if info.BlobContainer != "" {
// URL format - <storageEndpoint>/<blobContainer>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, partitionID))
baseURL = storageEndpoint.ResolveReference(path)
} else {
// Checking blob store for Azure functions
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/azure-webjobs-eventhub/%s/%s/%s/%s", eventHubNamespace, eventHubName, info.EventHubConsumerGroup, partitionID))
baseURL = storageEndpoint.ResolveReference(path)
}

// Create a BlockBlobURL object to a blob in the container.
blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{}))

get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err != nil {
return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err)
}

blobData := &bytes.Buffer{}
reader := get.Body(azblob.RetryReaderOptions{})
if _, err := blobData.ReadFrom(reader); err != nil {
return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err)
}
defer reader.Close() // The client must close the response body when finished with it

return getCheckpoint(blobData.Bytes())
}

func getCheckpoint(bytes []byte) (Checkpoint, error) {
var checkpoint Checkpoint
var pyCheckpoint pythonCheckpoint

if err := json.Unmarshal(bytes, &checkpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

if err := json.Unmarshal(bytes, &pyCheckpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint))

return checkpoint, err
}

// ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name)
// Connection string should be in following format:
// Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name
Expand Down Expand Up @@ -177,3 +77,20 @@ func ParseAzureEventHubConnectionString(connectionString string) (string, string

return eventHubNamespace, eventHubName, nil
}

func getHubAndNamespace(info EventHubInfo) (string, string, error) {
var eventHubNamespace string
var eventHubName string
var err error
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return "", "", err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

return eventHubNamespace, eventHubName, nil
}
Loading