Skip to content

Commit

Permalink
eventhub: can scale on new hubs where initially no storage checkpoint…
Browse files Browse the repository at this point in the history
… exists

Signed-off-by: Christian Leinweber <christian.leinweber@maibornwolff.de>
  • Loading branch information
christle committed Apr 30, 2020
1 parent de809b6 commit 3455d74
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/scalers/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event

get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false)
if err != nil {
return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %s", err)
return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err)
}

blobData := &bytes.Buffer{}
Expand Down
40 changes: 27 additions & 13 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scalers

import (
"context"
"errors"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -116,15 +117,22 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event
}

//GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition
func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionID string) (newEventCount int64, err error) {
func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionID string) (newEventCount int64, checkpoint Checkpoint, err error) {
partitionInfo, err := scaler.client.GetPartitionInformation(ctx, partitionID)
if err != nil {
return -1, fmt.Errorf("unable to get partition info: %s", err)
return -1, Checkpoint{}, fmt.Errorf("unable to get partition info: %s", err)
}

checkpoint, err := GetCheckpointFromBlobStorage(ctx, partitionID, *scaler.metadata)
checkpoint, err = GetCheckpointFromBlobStorage(ctx, partitionID, *scaler.metadata)
if err != nil {
return -1, fmt.Errorf("unable to get checkpoint from storage: %s", err)
// if blob not found return the total partition event count
err = errors.Unwrap(err)
if stErr, ok := err.(azblob.StorageError); ok {
if stErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return GetUnprocessedEventCountWithoutCheckpoint(*partitionInfo), Checkpoint{}, nil
}
}
return -1, Checkpoint{}, fmt.Errorf("unable to get checkpoint from storage: %s", err)
}

unprocessedEventCountInPartition := int64(0)
Expand All @@ -133,7 +141,7 @@ func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx conte
if partitionInfo.LastSequenceNumber > checkpoint.SequenceNumber {
unprocessedEventCountInPartition = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber

return unprocessedEventCountInPartition, nil
return unprocessedEventCountInPartition, checkpoint, nil
}

unprocessedEventCountInPartition = (math.MaxInt64 - partitionInfo.LastSequenceNumber) + checkpoint.SequenceNumber
Expand All @@ -142,7 +150,18 @@ func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx conte
unprocessedEventCountInPartition = 0
}

return unprocessedEventCountInPartition, nil
return unprocessedEventCountInPartition, checkpoint, nil
}

// GetUnprocessedEventCountWithoutCheckpoint returns the number of messages on the without a checkoutpoint info
func GetUnprocessedEventCountWithoutCheckpoint(partitionInfo eventhub.HubPartitionRuntimeInformation) int64 {

// if both values are 0 then there is exactly one message inside the hub. First message after init
if (partitionInfo.BeginningSequenceNumber == 0 && partitionInfo.LastSequenceNumber == 0) || (partitionInfo.BeginningSequenceNumber != partitionInfo.LastSequenceNumber) {
return (partitionInfo.LastSequenceNumber - partitionInfo.BeginningSequenceNumber) + 1
}

return 0
}

// IsActive determines if eventhub is active based on number of unprocessed events
Expand All @@ -158,7 +177,7 @@ func (scaler *AzureEventHubScaler) IsActive(ctx context.Context) (bool, error) {
for i := 0; i < len(partitionIDs); i++ {
partitionID := partitionIDs[i]

unprocessedEventCount, err := scaler.GetUnprocessedEventCountInPartition(ctx, partitionID)
unprocessedEventCount, _, err := scaler.GetUnprocessedEventCountInPartition(ctx, partitionID)

if err != nil {
return false, fmt.Errorf("unable to get unprocessedEventCount for isActive: %s", err)
Expand Down Expand Up @@ -202,14 +221,9 @@ func (scaler *AzureEventHubScaler) GetMetrics(ctx context.Context, metricName st
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %s", err)
}

checkpoint, err := GetCheckpointFromBlobStorage(ctx, partitionID, *scaler.metadata)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get checkpoint from storage: %s", err)
}

unprocessedEventCount := int64(0)

unprocessedEventCount, err = scaler.GetUnprocessedEventCountInPartition(ctx, partitionID)
unprocessedEventCount, checkpoint, err := scaler.GetUnprocessedEventCountInPartition(ctx, partitionID)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get unprocessedEventCount for metrics: %s", err)
}
Expand Down
114 changes: 112 additions & 2 deletions pkg/scalers/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func TestGetUnprocessedEventCountInPartition(t *testing.T) {
t.Errorf("err creating container: %s", err)
}

unprocessedEventCountInPartition0, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0")
unprocessedEventCountInPartition1, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1")
unprocessedEventCountInPartition0, _, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0")
unprocessedEventCountInPartition1, _, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1")
if err0 != nil {
t.Errorf("Expected success but got error: %s", err0)
}
Expand All @@ -151,6 +151,116 @@ func TestGetUnprocessedEventCountInPartition(t *testing.T) {
}
}
}
func TestGetUnprocessedEventCountIfNoCheckpointExists(t *testing.T) {
t.Log("This test will use the environment variable EVENTHUB_CONNECTION_STRING and STORAGE_CONNECTION_STRING if it is set.")
t.Log("If set, it will connect to the storage account and event hub to determine how many messages are in the event hub.")
t.Logf("EventHub has 1 message in partition 0 and 0 messages in partition 1")

eventHubKey := os.Getenv("AZURE_EVENTHUB_KEY")
storageConnectionString := os.Getenv("TEST_STORAGE_CONNECTION_STRING")

if eventHubKey != "" && storageConnectionString != "" {
eventHubConnectionString := fmt.Sprintf("Endpoint=sb://%s.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=%s;EntityPath=%s", testEventHubNamespace, eventHubKey, testEventHubName)
t.Log("Creating event hub client...")
hubOption := eventhub.HubWithPartitionedSender("0")
client, err := eventhub.NewHubFromConnectionString(eventHubConnectionString, hubOption)
if err != nil {
t.Errorf("Expected to create event hub client but got error: %s", err)
}

_, storageCredentials, err := GetStorageCredentials(storageConnectionString)
if err != nil {
t.Errorf("Expected to generate storage credentials but got error: %s", err)
}

if eventHubConnectionString == "" {
t.Fatal("Event hub connection string needed for test")
}

if storageConnectionString == "" {
t.Fatal("Storage connection string needed for test")
}

// Can actually test that numbers return
testEventHubScaler.metadata.eventHubConnection = eventHubConnectionString
testEventHubScaler.metadata.storageConnection = storageConnectionString
testEventHubScaler.client = client
testEventHubScaler.storageCredentials = storageCredentials
testEventHubScaler.metadata.eventHubConsumerGroup = "$Default"

// Send 1 message to event hub first
t.Log("Sending message to event hub")
err = SendMessageToEventHub(client)
if err != nil {
t.Error(err)
}

ctx := context.Background()
unprocessedEventCountInPartition0, _, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0")
unprocessedEventCountInPartition1, _, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1")
if err0 != nil {
t.Errorf("Expected success but got error: %s", err0)
}
if err1 != nil {
t.Errorf("Expected success but got error: %s", err1)
}

if unprocessedEventCountInPartition0 != 1 {
t.Errorf("Expected 1 message in partition 0, got %d", unprocessedEventCountInPartition0)
}

if unprocessedEventCountInPartition1 != 0 {
t.Errorf("Expected 0 messages in partition 1, got %d", unprocessedEventCountInPartition1)
}
}
}

func TestGetUnprocessedEventCountWithoutCheckpointReturning1Message(t *testing.T) {

//After the first message the lastsequencenumber init to 0
partitionInfo := eventhub.HubPartitionRuntimeInformation{
PartitionID: "0",
LastSequenceNumber: 0,
BeginningSequenceNumber: 0,
}

unprocessedEventCountInPartition0 := GetUnprocessedEventCountWithoutCheckpoint(partitionInfo)

if unprocessedEventCountInPartition0 != 1 {
t.Errorf("Expected 1 messages in partition 0, got %d", unprocessedEventCountInPartition0)
}
}

func TestGetUnprocessedEventCountWithoutCheckpointReturning0Message(t *testing.T) {

//An empty partition starts with an equal value on last-/beginning-sequencenumber other than 0
partitionInfo := eventhub.HubPartitionRuntimeInformation{
PartitionID: "0",
LastSequenceNumber: 255,
BeginningSequenceNumber: 255,
}

unprocessedEventCountInPartition0 := GetUnprocessedEventCountWithoutCheckpoint(partitionInfo)

if unprocessedEventCountInPartition0 != 0 {
t.Errorf("Expected 0 messages in partition 0, got %d", unprocessedEventCountInPartition0)
}
}

func TestGetUnprocessedEventCountWithoutCheckpointReturning2Messages(t *testing.T) {

partitionInfo := eventhub.HubPartitionRuntimeInformation{
PartitionID: "0",
LastSequenceNumber: 1,
BeginningSequenceNumber: 0,
}

unprocessedEventCountInPartition0 := GetUnprocessedEventCountWithoutCheckpoint(partitionInfo)

if unprocessedEventCountInPartition0 != 2 {
t.Errorf("Expected 0 messages in partition 0, got %d", unprocessedEventCountInPartition0)
}
}

const csharpSdkCheckpoint = `{
"Epoch": 123456,
Expand Down

0 comments on commit 3455d74

Please sign in to comment.