diff --git a/pkg/scalers/azure_eventhub.go b/pkg/scalers/azure_eventhub.go index cbcde83dbdb..7abedef0bef 100644 --- a/pkg/scalers/azure_eventhub.go +++ b/pkg/scalers/azure_eventhub.go @@ -110,7 +110,7 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false) if err != nil { - return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %s", err) + return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err) } blobData := &bytes.Buffer{} diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 9e1de7fd70a..060b505f46a 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -2,6 +2,7 @@ package scalers import ( "context" + "errors" "fmt" "math" "strconv" @@ -116,15 +117,22 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event } //GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition -func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionID string) (newEventCount int64, err error) { +func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionID string) (newEventCount int64, checkpoint Checkpoint, err error) { partitionInfo, err := scaler.client.GetPartitionInformation(ctx, partitionID) if err != nil { - return -1, fmt.Errorf("unable to get partition info: %s", err) + return -1, Checkpoint{}, fmt.Errorf("unable to get partition info: %s", err) } - checkpoint, err := GetCheckpointFromBlobStorage(ctx, partitionID, *scaler.metadata) + checkpoint, err = GetCheckpointFromBlobStorage(ctx, partitionID, *scaler.metadata) if err != nil { - return -1, fmt.Errorf("unable to get checkpoint from storage: %s", err) + // if blob not found return the total partition event count + err = errors.Unwrap(err) + if stErr, ok := err.(azblob.StorageError); ok { + if stErr.ServiceCode() == azblob.ServiceCodeBlobNotFound { + return GetUnprocessedEventCountWithoutCheckpoint(*partitionInfo), Checkpoint{}, nil + } + } + return -1, Checkpoint{}, fmt.Errorf("unable to get checkpoint from storage: %s", err) } unprocessedEventCountInPartition := int64(0) @@ -133,7 +141,7 @@ func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx conte if partitionInfo.LastSequenceNumber > checkpoint.SequenceNumber { unprocessedEventCountInPartition = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber - return unprocessedEventCountInPartition, nil + return unprocessedEventCountInPartition, checkpoint, nil } unprocessedEventCountInPartition = (math.MaxInt64 - partitionInfo.LastSequenceNumber) + checkpoint.SequenceNumber @@ -142,7 +150,18 @@ func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx conte unprocessedEventCountInPartition = 0 } - return unprocessedEventCountInPartition, nil + return unprocessedEventCountInPartition, checkpoint, nil +} + +// GetUnprocessedEventCountWithoutCheckpoint returns the number of messages on the without a checkoutpoint info +func GetUnprocessedEventCountWithoutCheckpoint(partitionInfo eventhub.HubPartitionRuntimeInformation) int64 { + + // if both values are 0 then there is exactly one message inside the hub. First message after init + if (partitionInfo.BeginningSequenceNumber == 0 && partitionInfo.LastSequenceNumber == 0) || (partitionInfo.BeginningSequenceNumber != partitionInfo.LastSequenceNumber) { + return (partitionInfo.LastSequenceNumber - partitionInfo.BeginningSequenceNumber) + 1 + } + + return 0 } // IsActive determines if eventhub is active based on number of unprocessed events @@ -158,7 +177,7 @@ func (scaler *AzureEventHubScaler) IsActive(ctx context.Context) (bool, error) { for i := 0; i < len(partitionIDs); i++ { partitionID := partitionIDs[i] - unprocessedEventCount, err := scaler.GetUnprocessedEventCountInPartition(ctx, partitionID) + unprocessedEventCount, _, err := scaler.GetUnprocessedEventCountInPartition(ctx, partitionID) if err != nil { return false, fmt.Errorf("unable to get unprocessedEventCount for isActive: %s", err) @@ -202,14 +221,9 @@ func (scaler *AzureEventHubScaler) GetMetrics(ctx context.Context, metricName st return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %s", err) } - checkpoint, err := GetCheckpointFromBlobStorage(ctx, partitionID, *scaler.metadata) - if err != nil { - return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get checkpoint from storage: %s", err) - } - unprocessedEventCount := int64(0) - unprocessedEventCount, err = scaler.GetUnprocessedEventCountInPartition(ctx, partitionID) + unprocessedEventCount, checkpoint, err := scaler.GetUnprocessedEventCountInPartition(ctx, partitionID) if err != nil { return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get unprocessedEventCount for metrics: %s", err) } diff --git a/pkg/scalers/azure_eventhub_test.go b/pkg/scalers/azure_eventhub_test.go index 29d6ec0ddef..db83845a9e7 100644 --- a/pkg/scalers/azure_eventhub_test.go +++ b/pkg/scalers/azure_eventhub_test.go @@ -126,8 +126,8 @@ func TestGetUnprocessedEventCountInPartition(t *testing.T) { t.Errorf("err creating container: %s", err) } - unprocessedEventCountInPartition0, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0") - unprocessedEventCountInPartition1, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1") + unprocessedEventCountInPartition0, _, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0") + unprocessedEventCountInPartition1, _, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1") if err0 != nil { t.Errorf("Expected success but got error: %s", err0) } @@ -151,6 +151,116 @@ func TestGetUnprocessedEventCountInPartition(t *testing.T) { } } } +func TestGetUnprocessedEventCountIfNoCheckpointExists(t *testing.T) { + t.Log("This test will use the environment variable EVENTHUB_CONNECTION_STRING and STORAGE_CONNECTION_STRING if it is set.") + t.Log("If set, it will connect to the storage account and event hub to determine how many messages are in the event hub.") + t.Logf("EventHub has 1 message in partition 0 and 0 messages in partition 1") + + eventHubKey := os.Getenv("AZURE_EVENTHUB_KEY") + storageConnectionString := os.Getenv("TEST_STORAGE_CONNECTION_STRING") + + if eventHubKey != "" && storageConnectionString != "" { + eventHubConnectionString := fmt.Sprintf("Endpoint=sb://%s.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=%s;EntityPath=%s", testEventHubNamespace, eventHubKey, testEventHubName) + t.Log("Creating event hub client...") + hubOption := eventhub.HubWithPartitionedSender("0") + client, err := eventhub.NewHubFromConnectionString(eventHubConnectionString, hubOption) + if err != nil { + t.Errorf("Expected to create event hub client but got error: %s", err) + } + + _, storageCredentials, err := GetStorageCredentials(storageConnectionString) + if err != nil { + t.Errorf("Expected to generate storage credentials but got error: %s", err) + } + + if eventHubConnectionString == "" { + t.Fatal("Event hub connection string needed for test") + } + + if storageConnectionString == "" { + t.Fatal("Storage connection string needed for test") + } + + // Can actually test that numbers return + testEventHubScaler.metadata.eventHubConnection = eventHubConnectionString + testEventHubScaler.metadata.storageConnection = storageConnectionString + testEventHubScaler.client = client + testEventHubScaler.storageCredentials = storageCredentials + testEventHubScaler.metadata.eventHubConsumerGroup = "$Default" + + // Send 1 message to event hub first + t.Log("Sending message to event hub") + err = SendMessageToEventHub(client) + if err != nil { + t.Error(err) + } + + ctx := context.Background() + unprocessedEventCountInPartition0, _, err0 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "0") + unprocessedEventCountInPartition1, _, err1 := testEventHubScaler.GetUnprocessedEventCountInPartition(ctx, "1") + if err0 != nil { + t.Errorf("Expected success but got error: %s", err0) + } + if err1 != nil { + t.Errorf("Expected success but got error: %s", err1) + } + + if unprocessedEventCountInPartition0 != 1 { + t.Errorf("Expected 1 message in partition 0, got %d", unprocessedEventCountInPartition0) + } + + if unprocessedEventCountInPartition1 != 0 { + t.Errorf("Expected 0 messages in partition 1, got %d", unprocessedEventCountInPartition1) + } + } +} + +func TestGetUnprocessedEventCountWithoutCheckpointReturning1Message(t *testing.T) { + + //After the first message the lastsequencenumber init to 0 + partitionInfo := eventhub.HubPartitionRuntimeInformation{ + PartitionID: "0", + LastSequenceNumber: 0, + BeginningSequenceNumber: 0, + } + + unprocessedEventCountInPartition0 := GetUnprocessedEventCountWithoutCheckpoint(partitionInfo) + + if unprocessedEventCountInPartition0 != 1 { + t.Errorf("Expected 1 messages in partition 0, got %d", unprocessedEventCountInPartition0) + } +} + +func TestGetUnprocessedEventCountWithoutCheckpointReturning0Message(t *testing.T) { + + //An empty partition starts with an equal value on last-/beginning-sequencenumber other than 0 + partitionInfo := eventhub.HubPartitionRuntimeInformation{ + PartitionID: "0", + LastSequenceNumber: 255, + BeginningSequenceNumber: 255, + } + + unprocessedEventCountInPartition0 := GetUnprocessedEventCountWithoutCheckpoint(partitionInfo) + + if unprocessedEventCountInPartition0 != 0 { + t.Errorf("Expected 0 messages in partition 0, got %d", unprocessedEventCountInPartition0) + } +} + +func TestGetUnprocessedEventCountWithoutCheckpointReturning2Messages(t *testing.T) { + + partitionInfo := eventhub.HubPartitionRuntimeInformation{ + PartitionID: "0", + LastSequenceNumber: 1, + BeginningSequenceNumber: 0, + } + + unprocessedEventCountInPartition0 := GetUnprocessedEventCountWithoutCheckpoint(partitionInfo) + + if unprocessedEventCountInPartition0 != 2 { + t.Errorf("Expected 0 messages in partition 0, got %d", unprocessedEventCountInPartition0) + } +} const csharpSdkCheckpoint = `{ "Epoch": 123456,