diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 9c256f72e093..1dac65f96a7e 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -11,6 +11,8 @@ enough to fit into a single batch. - Receiving from sessions using a SessionReceiver, created using Client.AcceptSessionFor(Queue|Subscription) or Client.AcceptNextSessionFor(Queue|Subscription). +- Can now renew a message lock for a ReceivedMessage using Receiver.RenewMessageLock() +- Can now renew a session lock for a SessionReceiver using SessionReceiver.RenewSessionLock() ### Breaking Changes diff --git a/sdk/messaging/azservicebus/client.go b/sdk/messaging/azservicebus/client.go index 7af792fddd92..82a1157bec59 100644 --- a/sdk/messaging/azservicebus/client.go +++ b/sdk/messaging/azservicebus/client.go @@ -164,6 +164,7 @@ func (client *Client) NewSender(queueOrTopic string) (*Sender, error) { func (client *Client) AcceptSessionForQueue(ctx context.Context, queue string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) { id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( + ctx, &sessionID, client.namespace, &entity{Queue: queue}, @@ -187,6 +188,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queue string, s func (client *Client) AcceptSessionForSubscription(ctx context.Context, topic string, subscription string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) { id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( + ctx, &sessionID, client.namespace, &entity{Topic: topic, Subscription: subscription}, @@ -210,6 +212,7 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topic st func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queue string, options *SessionReceiverOptions) (*SessionReceiver, error) { id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( + ctx, nil, client.namespace, &entity{Queue: queue}, @@ -233,6 +236,7 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queue strin func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topic string, subscription string, options *SessionReceiverOptions) (*SessionReceiver, error) { id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( + ctx, nil, client.namespace, &entity{Topic: topic, Subscription: subscription}, diff --git a/sdk/messaging/azservicebus/internal/mgmt.go b/sdk/messaging/azservicebus/internal/mgmt.go index 2a4cd859b9b2..06780994688e 100644 --- a/sdk/messaging/azservicebus/internal/mgmt.go +++ b/sdk/messaging/azservicebus/internal/mgmt.go @@ -55,6 +55,9 @@ type MgmtClient interface { ScheduleMessages(ctx context.Context, enqueueTime time.Time, messages ...*amqp.Message) ([]int64, error) CancelScheduled(ctx context.Context, seq ...int64) error + + RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error) + RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error) } func newMgmtClient(ctx context.Context, links AMQPLinks, ns NamespaceForMgmtClient) (MgmtClient, error) { @@ -391,14 +394,10 @@ func (mc *mgmtClient) PeekMessages(ctx context.Context, fromSequenceNumber int64 // RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation. // NOTE: this function assumes all the messages received on the same link. -func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockTokens ...*amqp.UUID) (err error) { +func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error) { ctx, span := tracing.StartConsumerSpanFromContext(ctx, tracing.SpanRenewLock, Version) defer span.End() - if len(lockTokens) == 0 { - return nil - } - renewRequestMsg := &amqp.Message{ ApplicationProperties: map[string]interface{}{ "operation": "com.microsoft:renew-lock", @@ -413,18 +412,73 @@ func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockToken } response, err := mc.doRPCWithRetry(ctx, renewRequestMsg, 3, 1*time.Second) + if err != nil { tab.For(ctx).Error(err) - return err + return nil, err } if response.Code != 200 { err := fmt.Errorf("error renewing locks: %v", response.Description) tab.For(ctx).Error(err) - return err + return nil, err } - return nil + // extract the new lock renewal times from the response + // response.Message. + + val, ok := response.Message.Value.(map[string]interface{}) + if !ok { + return nil, NewErrIncorrectType("Message.Value", map[string]interface{}{}, response.Message.Value) + } + + expirations, ok := val["expirations"] + + if !ok { + return nil, NewErrIncorrectType("Message.Value[\"expirations\"]", map[string]interface{}{}, response.Message.Value) + } + + asTimes, ok := expirations.([]time.Time) + + if !ok { + return nil, NewErrIncorrectType("Message.Value[\"expirations\"] as times", map[string]interface{}{}, response.Message.Value) + } + + return asTimes, nil +} + +// RenewSessionLocks renews a session lock. +func (mc *mgmtClient) RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error) { + body := map[string]interface{}{ + "session-id": sessionID, + } + + msg := &amqp.Message{ + Value: body, + ApplicationProperties: map[string]interface{}{ + "operation": "com.microsoft:renew-session-lock", + }, + } + + resp, err := mc.doRPCWithRetry(ctx, msg, 5, 5*time.Second) + + if err != nil { + return time.Time{}, err + } + + m, ok := resp.Message.Value.(map[string]interface{}) + + if !ok { + return time.Time{}, NewErrIncorrectType("Message.Value", map[string]interface{}{}, resp.Message.Value) + } + + lockedUntil, ok := m["expiration"].(time.Time) + + if !ok { + return time.Time{}, NewErrIncorrectType("Message.Value[\"expiration\"] as times", time.Time{}, resp.Message.Value) + } + + return lockedUntil, nil } // SendDisposition allows you settle a message using the management link, rather than via your diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 62f1e6ae8f65..65b9bfe03f07 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -419,6 +419,26 @@ func (r *Receiver) receiveMessage(ctx context.Context, options *ReceiveOptions) return messages[0], nil } +// RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`. +func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error { + _, _, mgmt, _, err := r.amqpLinks.Get(ctx) + + if err != nil { + return err + } + + newExpirationTime, err := mgmt.RenewLocks(ctx, msg.rawAMQPMessage.LinkName(), []amqp.UUID{ + (amqp.UUID)(msg.LockToken), + }) + + if err != nil { + return err + } + + msg.LockedUntil = &newExpirationTime[0] + return nil +} + // Close permanently closes the receiver. func (r *Receiver) Close(ctx context.Context) error { r.cleanupOnClose() diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index d1fc33b8997f..c75d3aa41ab5 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -291,6 +291,45 @@ func TestReceiverPeek(t *testing.T) { require.Empty(t, noMessagesExpected) } +func TestReceiver_RenewMessageLock(t *testing.T) { + client, cleanup, queueName := setupLiveTest(t, nil) + defer cleanup() + + sender, err := client.NewSender(queueName) + require.NoError(t, err) + + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte("hello world"), + }) + require.NoError(t, err) + + receiver, err := client.NewReceiverForQueue(queueName, nil) + require.NoError(t, err) + + messages, err := receiver.ReceiveMessages(context.Background(), 1, nil) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + lockedUntilOld := messages[0].LockedUntil + require.NoError(t, receiver.RenewMessageLock(context.Background(), messages[0])) + + // these should hopefully be unaffected by clock drift since both values come from + // the service's times, not ours. + require.Greater(t, messages[0].LockedUntil.UnixNano(), lockedUntilOld.UnixNano()) + + // try renewing a bogus token + for i := 0; i < len(messages[0].LockToken); i++ { + messages[0].LockToken[i] = 0 + } + + expectedLockBadError := receiver.RenewMessageLock(context.Background(), messages[0]) + // String matching can go away once we fix #15644 + // For now it at least provides the user with good context that something is incorrect about their lock token. + require.Contains(t, expectedLockBadError.Error(), + "status code 410 and description: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue", + "error message from SB comes through") +} + func TestReceiverOptions(t *testing.T) { // defaults receiver := &Receiver{} diff --git a/sdk/messaging/azservicebus/session_receiver.go b/sdk/messaging/azservicebus/session_receiver.go index 706f10b89d08..cc58866d3835 100644 --- a/sdk/messaging/azservicebus/session_receiver.go +++ b/sdk/messaging/azservicebus/session_receiver.go @@ -6,6 +6,7 @@ package azservicebus import ( "context" "fmt" + "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/go-amqp" @@ -15,7 +16,8 @@ import ( type SessionReceiver struct { *Receiver - sessionID *string + sessionID *string + lockedUntil time.Time } // SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription` @@ -46,12 +48,13 @@ func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions { } } -func newSessionReceiver(sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) { +func newSessionReceiver(ctx context.Context, sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) { const sessionFilterName = "com.microsoft:session-filter" const code = uint64(0x00000137000000C) sessionReceiver := &SessionReceiver{ - sessionID: sessionID, + sessionID: sessionID, + lockedUntil: time.Time{}, } var err error @@ -89,15 +92,47 @@ func newSessionReceiver(sessionID *string, ns internal.NamespaceWithNewAMQPLinks return nil, err } + // temp workaround until we expose the session expiration time from the receiver in go-amqp + if err := sessionReceiver.RenewSessionLock(ctx); err != nil { + _ = sessionReceiver.Close(context.Background()) + return nil, err + } + return sessionReceiver, nil } +// SessionID is the session ID for this SessionReceiver. func (sr *SessionReceiver) SessionID() string { // return the ultimately assigned session ID for this link (anonymous will get it from the // link filter options, non-anonymous is set in newSessionReceiver) return *sr.sessionID } +// LockedUntil is the time the lock on this session expires. +// The lock can be renewed using `SessionReceiver.RenewSessionLock`. +func (sr *SessionReceiver) LockedUntil() time.Time { + return sr.lockedUntil +} + +// RenewSessionLock renews this session's lock. The new expiration time is available +// using `LockedUntil`. +func (sr *SessionReceiver) RenewSessionLock(ctx context.Context) error { + _, _, mgmt, _, err := sr.amqpLinks.Get(ctx) + + if err != nil { + return err + } + + newLockedUntil, err := mgmt.RenewSessionLock(ctx, *sr.sessionID) + + if err != nil { + return err + } + + sr.lockedUntil = newLockedUntil + return nil +} + // init ensures the link was created, guaranteeing that we get our expected session lock. func (sr *SessionReceiver) init(ctx context.Context) error { // initialize the links diff --git a/sdk/messaging/azservicebus/session_receiver_test.go b/sdk/messaging/azservicebus/session_receiver_test.go index edd2701ea71d..649fddf4f19b 100644 --- a/sdk/messaging/azservicebus/session_receiver_test.go +++ b/sdk/messaging/azservicebus/session_receiver_test.go @@ -183,6 +183,43 @@ func TestSessionReceiver_nonSessionReceiver(t *testing.T) { require.Contains(t, amqpError.Description, "It is not possible for an entity that requires sessions to create a non-sessionful message receiver.") } +func TestSessionReceiver_RenewSessionLock(t *testing.T) { + client, cleanup, queueName := setupLiveTest(t, &QueueProperties{ + RequiresSession: to.BoolPtr(true), + }) + defer cleanup() + + sessionReceiver, err := client.AcceptSessionForQueue(context.Background(), queueName, "session-1", nil) + require.NoError(t, err) + + sender, err := client.NewSender(queueName) + require.NoError(t, err) + + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte("hello world"), + SessionID: to.StringPtr("session-1"), + }) + require.NoError(t, err) + + messages, err := sessionReceiver.ReceiveMessages(context.Background(), 1, nil) + require.NoError(t, err) + require.NotNil(t, messages) + + // surprisingly this works. Not sure what it accomplishes though. C# has a manual check for it. + // err = sessionReceiver.RenewMessageLock(context.Background(), messages[0]) + // require.NoError(t, err) + + orig := sessionReceiver.LockedUntil() + require.NoError(t, sessionReceiver.RenewSessionLock(context.Background())) + require.Greater(t, sessionReceiver.LockedUntil().UnixNano(), orig.UnixNano()) + + // bogus renewal + sessionReceiver.sessionID = to.StringPtr("bogus") + + err = sessionReceiver.RenewSessionLock(context.Background()) + require.Contains(t, err.Error(), "status code 410 and description: The session lock has expired on the MessageSession") +} + func Test_toReceiverOptions(t *testing.T) { require.Nil(t, toReceiverOptions(nil))