Skip to content

Commit

Permalink
refactor eventhub checkpointStrategy selection
Browse files Browse the repository at this point in the history
  • Loading branch information
christle committed Apr 29, 2021
1 parent f722a30 commit 132ff93
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 31 deletions.
14 changes: 8 additions & 6 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"strconv"
"strings"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/imdario/mergo"
Expand Down Expand Up @@ -76,22 +77,24 @@ func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer,
}

func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
if info.CheckpointStrategy == "GoSdk" {

switch true {
case (info.CheckpointStrategy == "GoSdk"):
return &goSdkCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
}
} else if info.CheckpointStrategy == "BlobMetadata" {
case (info.CheckpointStrategy == "BlobMetadata"):
return &blobMetadataCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
}
} else if info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == "" {
case (info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == ""):
return &azureWebjobCheckpointer{
containerName: "azure-webjobs-eventhub",
partitionID: partitionID,
}
} else {
default:
return &defaultCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
Expand Down Expand Up @@ -129,8 +132,7 @@ func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*u
return nil, err
}

path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))

path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, strings.ToLower(info.EventHubConsumerGroup), checkpointer.partitionID))
return path, nil
}

Expand Down
36 changes: 11 additions & 25 deletions pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) {

partitionID := "0"
offset := "1001"
consumerGroup := "$Default"
consumerGroup := "$Default1"

sequencenumber := int64(1)

Expand Down Expand Up @@ -62,9 +62,9 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
return
}

partitionID := "0"
partitionID := "1"
offset := "1005"
consumerGroup := "$Default"
consumerGroup := "$Default2"

sequencenumber := int64(1)

Expand Down Expand Up @@ -92,7 +92,7 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
BlobContainer: containerName,
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
Expand All @@ -103,9 +103,9 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
return
}

partitionID := "0"
partitionID := "2"
offset := "1006"
consumerGroup := "$Default"
consumerGroup := "$Default3"

sequencenumber := int64(1)

Expand Down Expand Up @@ -133,7 +133,7 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
BlobContainer: containerName,
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
Expand All @@ -144,7 +144,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
return
}

partitionID := "0"
partitionID := "4"
offset := "1002"
consumerGroup := "$default"

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
CheckpointStrategy: "BlobMetadata",
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
CheckpointStrategy: "GoSdk",
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
Expand Down Expand Up @@ -263,20 +263,6 @@ func TestShouldParseCheckpointForDefault(t *testing.T) {
assert.Equal(t, url.Path, "/DefaultContainer/$Default/0")
}

func TestShouldParseCheckpointForBlobMetadataWithCheckpointStrategy(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
EventHubConsumerGroup: "$Default",
CheckpointStrategy: "BlobMetadata",
BlobContainer: "containername",
}

cp := newCheckpointer(eventHubInfo, "0")
url, _ := cp.resolvePath(eventHubInfo)

assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0")
}

func TestShouldParseCheckpointForBlobMetadata(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
Expand All @@ -288,7 +274,7 @@ func TestShouldParseCheckpointForBlobMetadata(t *testing.T) {
cp := newCheckpointer(eventHubInfo, "0")
url, _ := cp.resolvePath(eventHubInfo)

assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0")
assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$default/checkpoint/0")
}

func TestShouldParseCheckpointForGoSdk(t *testing.T) {
Expand Down

0 comments on commit 132ff93

Please sign in to comment.