Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Message.Complete() fails if time elapsed since receipt more than 2h #249

Closed
orzel7 opened this issue Sep 3, 2021 · 6 comments
Closed

Message.Complete() fails if time elapsed since receipt more than 2h #249

orzel7 opened this issue Sep 3, 2021 · 6 comments

Comments

@orzel7
Copy link

orzel7 commented Sep 3, 2021

This code fails at m.Complete(ctx) if "processing" time took 2h30m with error:
*Error{Condition: amqp:connection:forced, Description: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:5220ced3241b41aa899db806978be5cc_G12, SystemTracker:gateway7, Timestamp:2021-09-03T08:46:10, Info: map[]}

But this code works fine if i change it to Sleep(time.Hour*2)
I've already tested this example with the newest version, 0.11

import (
	"context"
	"fmt"
	"time"

	servicebus "github.com/Azure/azure-service-bus-go"
)

const connectionString = "Endpoint=sb://....."
essKey="....."
const queueName = "queue.name"
const renewLockInterval = time.Second * 20

func main() {
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
	if err != nil {
		fmt.Println(err)
		return
	}

	queue, err := ns.NewQueue(queueName)
	if err != nil {
		fmt.Println(err)
		return
	}

	ctx := context.Background()
	rcv, err := queue.NewReceiver(ctx)
	if err != nil {
		fmt.Println(err)
		return
	}

	messageHandler := func(ctx context.Context, m *servicebus.Message) error {
		ticker := time.NewTicker(renewLockInterval)
		quit := make(chan struct{})
		//Periodically renew lock on received message
		go func() {
		exit:
			for {
				select {
				case <-quit:
					break exit
				case <-ticker.C:
					if err := queue.RenewLocks(ctx, m); err != nil {
						break exit
					}
				}
			}
			ticker.Stop()
		}()
		//Emulate long processing
		fmt.Println("Start processing...")
		time.Sleep(time.Hour*2 + time.Minute*30)
                close(quit)
		fmt.Println("Done")
		err := m.Complete(ctx)
		fmt.Println("After complete", err)
		return err
	}

	fmt.Println("Waiting for message...")
	rcv.ReceiveOne(ctx, servicebus.HandlerFunc(messageHandler))
}
@orzel7
Copy link
Author

orzel7 commented Sep 6, 2021

It looks, the root of the problem is queue.RenewLocks(ctx, m) failing after 2h 5min.
So, if m.Complete is called between 2h5m and 2h10m(2h5m + 300000ms), it returns The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue.
If sleeping time is between 2h10m and 2h15m, the error is The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'.
And, if sleeping time is longer than 2h20m, the error is The link 'G14:83746879:9dljfTHKwAUGw_mzCQ3cQT_PlApM_Mro1oTmLoo4FU2yN45d3JERtA' is force detached. Code: consumer(link22930003). Details: AmqpMessageConsumer.IdleTimerExpired: Idle timeout: 00:10:00.

@serbrech
Copy link
Member

serbrech commented Sep 9, 2021

I might be wrong but in your code, I think that if the renewlocks call fails just once, you exit the routine silently.
so you would stop renewling the lock?

the Complete sends a disposition on the receiver link.
that link is idle because you only receive 1 message and do nothing else. I think that would fail too after just > 10min?

@serbrech
Copy link
Member

serbrech commented Sep 9, 2021

@jhendrixMSFT @richardpark-msft I think that's the same case I mentioned, about link heartbeat?

@orzel7
Copy link
Author

orzel7 commented Sep 10, 2021

I might be wrong but in your code, I think that if the renewlocks call fails just once, you exit the routine silently.
so you would stop renewling the lock?

Right

the Complete sends a disposition on the receiver link.
that link is idle because you only receive 1 message and do nothing else. I think that would fail too after just > 10min?

Exactly

@orzel7
Copy link
Author

orzel7 commented Sep 10, 2021

If i increase token validity time from 2(default value) to 3 hours, this example works without error.

@richardpark-msft
Copy link
Member

Hi @orzel7,

We've moved development of this package to the azure-sdk-for-go repo link. (I know you now know this since I've posted it to a few of your issues).

I have a test that's similar to this that I'll enhance, but it does seem like the root cause is related to when we lose the lock, which seems to be related to components idling out. There were a few changes in the way that we manage links, which I wrote about more in #246.

There is also an additional change we made in how we calculate when the next token renewal occurs. Due to clock drift it's possible for our local time to not match the service time. If we attempt to be too exacting on when we do the next claim renewal we can miss our window, and end up with a connection that gets closed due to lack of authorization.

.NET Service Bus has a simple calculation they use which we have replicated into the Go codebase, which does account for this:
https://github.com/Azure/azure-sdk-for-go/blob/c5b9d7c743aee9d01c26907e6923605b7096c51b/sdk/messaging/azservicebus/internal/namespace.go#L505

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants