Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Processor Host does not start consuming messages #276

Closed
BICKELC opened this issue Oct 26, 2022 · 5 comments · Fixed by #277
Closed

Event Processor Host does not start consuming messages #276

BICKELC opened this issue Oct 26, 2022 · 5 comments · Fixed by #277
Assignees

Comments

@BICKELC
Copy link

BICKELC commented Oct 26, 2022

After upgrading azure-event-hubs-go to 3.3.19, it does not start consuming messages anymore on using the Event Processor Host.
The following code (based on the example in Readme ):

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	eventhub "github.com/Azure/azure-event-hubs-go/v3"
	"github.com/Azure/azure-event-hubs-go/v3/eph"
	"github.com/Azure/azure-event-hubs-go/v3/storage"
	"github.com/Azure/azure-storage-blob-go/azblob"
	"github.com/Azure/go-autorest/autorest/azure"
)

func main() {
	// Azure Storage account information
	storageAccountName := os.Getenv("StorageAccountName")
	fmt.Printf("storageAccountName is: %s\n", storageAccountName)
	storageAccountKey := os.Getenv("StorageAccountKey")
	fmt.Printf("storageAccountKey is: %s\n", storageAccountKey)
	// Azure Storage container to store leases and checkpoints
	storageContainerName := "ncsconsumergroup"

	// Azure Event Hub connection string
	eventHubConnStr := os.Getenv("EventhubConnectionString")
	fmt.Printf("eventHubConnStr is: %s\n", eventHubConnStr)

	// create a new Azure Storage Leaser / Checkpointer
	cred, err := azblob.NewSharedKeyCredential(storageAccountName, storageAccountKey)
	if err != nil {
		fmt.Println(err)
		return
	}

	leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, storageAccountName, storageContainerName, azure.PublicCloud)
	if err != nil {
		fmt.Println(err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
	// create a new EPH processor
	processor, err := eph.NewFromConnectionString(ctx, eventHubConnStr, leaserCheckpointer, leaserCheckpointer)
	if err != nil {
		fmt.Println(err)
		return
	}

	// register a message handler -- many can be registered
	handlerID, err := processor.RegisterHandler(ctx,
		func(c context.Context, e *eventhub.Event) error {
			fmt.Println(string(e.Data))
			return nil
		})
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Printf("handler id: %q is running\n", handlerID)

	// unregister a handler to stop that handler from receiving events
	// processor.UnregisterHandler(ctx, handleID)

	// start handling messages from all of the partitions balancing across multiple consumers
	err = processor.StartNonBlocking(ctx)
	if err != nil {
		fmt.Println("ERROR on StartNonBlocking")
		fmt.Println(err)
		return
	}

	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan

	err = processor.Close(context.Background())
	if err != nil {
		fmt.Println(err)
		return
	}
}

On letting this code run the first time, there is no issue. But as soon as the entries in objectstorage already exist (at the second run), this code fails with the following output:

storageAccountName is: XXX
storageAccountKey is: XXX
eventHubConnStr is: XXX
handler id: "a39d161d-341f-47c9-8414-66fb384ee9e4" is running

    ______                 __  __  __      __
   / ____/   _____  ____  / /_/ / / /_  __/ /_  _____
  / __/ | | / / _ \/ __ \/ __/ /_/ / / / / __ \/ ___/
 / /___ | |/ /  __/ / / / /_/ __  / /_/ / /_/ (__  )
/_____/ |___/\___/_/ /_/\__/_/ /_/\__,_/_.___/____/

ERROR on StartNonBlocking
-> github.com/Azure/azure-event-hubs-go/v3/internal/azure-storage-blob-go/azblob.newStorageError, /go/src/XXX/vendor/github.com/Azure/azure-event-hubs-go/v3/internal/azure-storage-blob-go/azblob/zc_storage_error.go:43
===== RESPONSE ERROR (ServiceCode=BlobAlreadyExists) =====
Description=The specified blob already exists.
RequestId:2f400cd9-901e-0031-724a-e988ee000000
Time:2022-10-26T14:50:50.1040632Z, Details:
   Code: BlobAlreadyExists
   PUT https://XXX.blob.core.windows.net/ncsconsumergroup/0?timeout=20
   Authorization: REDACTED
   Content-Length: [80]
   If-None-Match: [*]
   User-Agent: [Azure-Storage/0.15 (go1.19.2; linux)]
   X-Ms-Blob-Cache-Control: []
   X-Ms-Blob-Content-Disposition: []
   X-Ms-Blob-Content-Encoding: []
   X-Ms-Blob-Content-Language: []
   X-Ms-Blob-Content-Type: []
   X-Ms-Blob-Type: [BlockBlob]
   X-Ms-Client-Request-Id: [2f349a3d-b8d5-470d-57c2-69ce87bf0625]
   X-Ms-Date: [Wed, 26 Oct 2022 14:50:50 GMT]
   X-Ms-Version: [2018-11-09]
   --------------------------------------------------------------------------------
   RESPONSE Status: 409 The specified blob already exists.
   Content-Length: [220]
   Content-Type: [application/xml]
   Date: [Wed, 26 Oct 2022 14:50:49 GMT]
   Server: [Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0]
   X-Ms-Error-Code: [BlobAlreadyExists]
   X-Ms-Request-Id: [2f400cd9-901e-0031-724a-e988ee000000]
   X-Ms-Version: [2018-11-09]

(sensitive values have been replaced with XXX by me, but the output was as expected)

The object-storage we are using has the kind Storage (general purpose v1) . But there are the same issues with StorageV2.

The code above is running as pod on AKS.

On analysing the issue, I realised, that version 3.3.18 of azure-event-hubs-go had the same issue, but this error during setup has been ignored: (2138252#diff-e676c998b671657559774d8c366741fbcad16810aa28025ba87b945718d67378L414)

Is there something wrong with my code above?

Expected Behavior

Event Processor Host has no issues on startup.

Actual Behavior

Event Processor Host is not starting to consume messages with the error "BlobAlreadyExists".

Environment

  • OS: Linux (compiled go application running in docker container based on scratch)
  • Go version: 1.19.2
  • Version of Library: v3.3.19
@richardpark-msft
Copy link
Member

Hi @BICKELC, I'll take a look. We did a recent change in 3.3.19 where we made it possible to use newer versions of the azblob dependency.

Do you know which version you're referencing in your go.mod?

@richardpark-msft richardpark-msft self-assigned this Oct 26, 2022
@BICKELC
Copy link
Author

BICKELC commented Oct 27, 2022

Hi @richardpark-msft,
thanks a lot for the fast reply.

Information about the example above with the new eventhub-lib

The azblob-dependency of the example above had the version github.com/Azure/azure-storage-blob-go v0.15.0.

Regarding my analysis the error comes from here:

if _, err := h.leaser.EnsureLease(ctx, partitionID); err != nil {

Additional analysis with old version of eventhub-lib

While I was analysing the issue, I also switched back the versions in go.mod file:

	github.com/Azure/azure-event-hubs-go/v3 => github.com/Azure/azure-event-hubs-go/v3 v3.3.18
	github.com/Azure/azure-storage-blob-go => github.com/Azure/azure-storage-blob-go v0.6.0

in addition I changed the old setup method (in version 3.3.18) the following:

diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go b/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go
index fda6965..39f7178 100644
--- a/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go
+++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go
@@ -411,7 +411,9 @@ func (h *EventProcessorHost) setup(ctx context.Context) error {
                scheduler := newScheduler(h)

                for _, partitionID := range h.partitionIDs {
-                       h.leaser.EnsureLease(ctx, partitionID)
+                       if _, err := h.leaser.EnsureLease(ctx, partitionID); err != nil {
+                               fmt.Println(err)
+                       }
                        h.checkpointer.EnsureCheckpoint(ctx, partitionID)
                }

Interestingly there I received exactly the same error (for each partition). But the old version had no issues to start as the error was ignored:

-> github.com/Azure/azure-storage-blob-go/azblob.newStorageError, /go/src/XXX/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_storage_error.go:42
===== RESPONSE ERROR (ServiceCode=BlobAlreadyExists) =====
Description=The specified blob already exists.
RequestId:e1f235cd-a01e-0053-01d5-e9123d000000
Time:2022-10-27T07:24:41.2518645Z, Details:
   Code: BlobAlreadyExists
   PUT https://XXX.blob.core.windows.net/ncsconsumergroup/0?timeout=61
   Authorization: REDACTED
   Content-Length: [80]
   If-None-Match: [*]
   User-Agent: [Azure-Storage/0.6 (go1.19.2; linux)]
   X-Ms-Blob-Cache-Control: []
   X-Ms-Blob-Content-Disposition: []
   X-Ms-Blob-Content-Encoding: []
   X-Ms-Blob-Content-Language: []
   X-Ms-Blob-Content-Type: []
   X-Ms-Blob-Type: [BlockBlob]
   X-Ms-Client-Request-Id: [1f8f5663-66c8-4ec1-5bb1-28d2a0936650]
   X-Ms-Date: [Thu, 27 Oct 2022 07:24:41 GMT]
   X-Ms-Version: [2018-11-09]
   --------------------------------------------------------------------------------
   RESPONSE Status: 409 The specified blob already exists.
   Content-Length: [220]
   Content-Type: [application/xml]
   Date: [Thu, 27 Oct 2022 07:24:40 GMT]
   Server: [Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0]
   X-Ms-Error-Code: [BlobAlreadyExists]
   X-Ms-Request-Id: [e1f235cd-a01e-0053-01d5-e9123d000000]
   X-Ms-Version: [2018-11-09]

@richardpark-msft
Copy link
Member

I'm adding in a test now, but I think we're missing handling "409 conflict" as a valid return status. It's an area I'm not super familiar with, so bear with me. :)

richardpark-msft added a commit that referenced this issue Nov 8, 2022
Storage failures don't return a Response, and require us to do an errors.As() and check the returned error instead. This checks for the two codes that can come back if the blob already exists (409) or if the blob exists _and_ it has an active storage lease (412).

Also, fixed a race condition in the LeaseReceiver that was causing Storage/TestMultiple() to fail.

Fixes #276
@BICKELC
Copy link
Author

BICKELC commented Nov 9, 2022

Thanks a lot for fixing this issue :)

@richardpark-msft
Copy link
Member

Just tagged an official release as well: https://github.com/Azure/azure-event-hubs-go/releases/tag/v3.3.20

Thank you for reporting the issue, it really helps us out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants