Skip to content

Commit

Permalink
[azservicebus] Allow for renewing message and session locks (#15929)
Browse files Browse the repository at this point in the history
Adding in message lock renewal and session lock renewal.
  • Loading branch information
richardpark-msft authored Oct 27, 2021
1 parent 2f1cf83 commit a79593a
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 11 deletions.
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
enough to fit into a single batch.
- Receiving from sessions using a SessionReceiver, created using Client.AcceptSessionFor(Queue|Subscription)
or Client.AcceptNextSessionFor(Queue|Subscription).
- Can now renew a message lock for a ReceivedMessage using Receiver.RenewMessageLock()
- Can now renew a session lock for a SessionReceiver using SessionReceiver.RenewSessionLock()

### Breaking Changes

Expand Down
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (client *Client) NewSender(queueOrTopic string) (*Sender, error) {
func (client *Client) AcceptSessionForQueue(ctx context.Context, queue string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
&sessionID,
client.namespace,
&entity{Queue: queue},
Expand All @@ -187,6 +188,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queue string, s
func (client *Client) AcceptSessionForSubscription(ctx context.Context, topic string, subscription string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
&sessionID,
client.namespace,
&entity{Topic: topic, Subscription: subscription},
Expand All @@ -210,6 +212,7 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topic st
func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queue string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
nil,
client.namespace,
&entity{Queue: queue},
Expand All @@ -233,6 +236,7 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queue strin
func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topic string, subscription string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
nil,
client.namespace,
&entity{Topic: topic, Subscription: subscription},
Expand Down
70 changes: 62 additions & 8 deletions sdk/messaging/azservicebus/internal/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type MgmtClient interface {

ScheduleMessages(ctx context.Context, enqueueTime time.Time, messages ...*amqp.Message) ([]int64, error)
CancelScheduled(ctx context.Context, seq ...int64) error

RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error)
}

func newMgmtClient(ctx context.Context, links AMQPLinks, ns NamespaceForMgmtClient) (MgmtClient, error) {
Expand Down Expand Up @@ -391,14 +394,10 @@ func (mc *mgmtClient) PeekMessages(ctx context.Context, fromSequenceNumber int64

// RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation.
// NOTE: this function assumes all the messages received on the same link.
func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockTokens ...*amqp.UUID) (err error) {
func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error) {
ctx, span := tracing.StartConsumerSpanFromContext(ctx, tracing.SpanRenewLock, Version)
defer span.End()

if len(lockTokens) == 0 {
return nil
}

renewRequestMsg := &amqp.Message{
ApplicationProperties: map[string]interface{}{
"operation": "com.microsoft:renew-lock",
Expand All @@ -413,18 +412,73 @@ func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockToken
}

response, err := mc.doRPCWithRetry(ctx, renewRequestMsg, 3, 1*time.Second)

if err != nil {
tab.For(ctx).Error(err)
return err
return nil, err
}

if response.Code != 200 {
err := fmt.Errorf("error renewing locks: %v", response.Description)
tab.For(ctx).Error(err)
return err
return nil, err
}

return nil
// extract the new lock renewal times from the response
// response.Message.

val, ok := response.Message.Value.(map[string]interface{})
if !ok {
return nil, NewErrIncorrectType("Message.Value", map[string]interface{}{}, response.Message.Value)
}

expirations, ok := val["expirations"]

if !ok {
return nil, NewErrIncorrectType("Message.Value[\"expirations\"]", map[string]interface{}{}, response.Message.Value)
}

asTimes, ok := expirations.([]time.Time)

if !ok {
return nil, NewErrIncorrectType("Message.Value[\"expirations\"] as times", map[string]interface{}{}, response.Message.Value)
}

return asTimes, nil
}

// RenewSessionLocks renews a session lock.
func (mc *mgmtClient) RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error) {
body := map[string]interface{}{
"session-id": sessionID,
}

msg := &amqp.Message{
Value: body,
ApplicationProperties: map[string]interface{}{
"operation": "com.microsoft:renew-session-lock",
},
}

resp, err := mc.doRPCWithRetry(ctx, msg, 5, 5*time.Second)

if err != nil {
return time.Time{}, err
}

m, ok := resp.Message.Value.(map[string]interface{})

if !ok {
return time.Time{}, NewErrIncorrectType("Message.Value", map[string]interface{}{}, resp.Message.Value)
}

lockedUntil, ok := m["expiration"].(time.Time)

if !ok {
return time.Time{}, NewErrIncorrectType("Message.Value[\"expiration\"] as times", time.Time{}, resp.Message.Value)
}

return lockedUntil, nil
}

// SendDisposition allows you settle a message using the management link, rather than via your
Expand Down
20 changes: 20 additions & 0 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,26 @@ func (r *Receiver) receiveMessage(ctx context.Context, options *ReceiveOptions)
return messages[0], nil
}

// RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`.
func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error {
_, _, mgmt, _, err := r.amqpLinks.Get(ctx)

if err != nil {
return err
}

newExpirationTime, err := mgmt.RenewLocks(ctx, msg.rawAMQPMessage.LinkName(), []amqp.UUID{
(amqp.UUID)(msg.LockToken),
})

if err != nil {
return err
}

msg.LockedUntil = &newExpirationTime[0]
return nil
}

// Close permanently closes the receiver.
func (r *Receiver) Close(ctx context.Context) error {
r.cleanupOnClose()
Expand Down
39 changes: 39 additions & 0 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,45 @@ func TestReceiverPeek(t *testing.T) {
require.Empty(t, noMessagesExpected)
}

func TestReceiver_RenewMessageLock(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()

sender, err := client.NewSender(queueName)
require.NoError(t, err)

err = sender.SendMessage(context.Background(), &Message{
Body: []byte("hello world"),
})
require.NoError(t, err)

receiver, err := client.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)

messages, err := receiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)

time.Sleep(2 * time.Second)
lockedUntilOld := messages[0].LockedUntil
require.NoError(t, receiver.RenewMessageLock(context.Background(), messages[0]))

// these should hopefully be unaffected by clock drift since both values come from
// the service's times, not ours.
require.Greater(t, messages[0].LockedUntil.UnixNano(), lockedUntilOld.UnixNano())

// try renewing a bogus token
for i := 0; i < len(messages[0].LockToken); i++ {
messages[0].LockToken[i] = 0
}

expectedLockBadError := receiver.RenewMessageLock(context.Background(), messages[0])
// String matching can go away once we fix #15644
// For now it at least provides the user with good context that something is incorrect about their lock token.
require.Contains(t, expectedLockBadError.Error(),
"status code 410 and description: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue",
"error message from SB comes through")
}

func TestReceiverOptions(t *testing.T) {
// defaults
receiver := &Receiver{}
Expand Down
41 changes: 38 additions & 3 deletions sdk/messaging/azservicebus/session_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azservicebus
import (
"context"
"fmt"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
"github.com/Azure/go-amqp"
Expand All @@ -15,7 +16,8 @@ import (
type SessionReceiver struct {
*Receiver

sessionID *string
sessionID *string
lockedUntil time.Time
}

// SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription`
Expand Down Expand Up @@ -46,12 +48,13 @@ func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions {
}
}

func newSessionReceiver(sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) {
func newSessionReceiver(ctx context.Context, sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) {
const sessionFilterName = "com.microsoft:session-filter"
const code = uint64(0x00000137000000C)

sessionReceiver := &SessionReceiver{
sessionID: sessionID,
sessionID: sessionID,
lockedUntil: time.Time{},
}

var err error
Expand Down Expand Up @@ -89,15 +92,47 @@ func newSessionReceiver(sessionID *string, ns internal.NamespaceWithNewAMQPLinks
return nil, err
}

// temp workaround until we expose the session expiration time from the receiver in go-amqp
if err := sessionReceiver.RenewSessionLock(ctx); err != nil {
_ = sessionReceiver.Close(context.Background())
return nil, err
}

return sessionReceiver, nil
}

// SessionID is the session ID for this SessionReceiver.
func (sr *SessionReceiver) SessionID() string {
// return the ultimately assigned session ID for this link (anonymous will get it from the
// link filter options, non-anonymous is set in newSessionReceiver)
return *sr.sessionID
}

// LockedUntil is the time the lock on this session expires.
// The lock can be renewed using `SessionReceiver.RenewSessionLock`.
func (sr *SessionReceiver) LockedUntil() time.Time {
return sr.lockedUntil
}

// RenewSessionLock renews this session's lock. The new expiration time is available
// using `LockedUntil`.
func (sr *SessionReceiver) RenewSessionLock(ctx context.Context) error {
_, _, mgmt, _, err := sr.amqpLinks.Get(ctx)

if err != nil {
return err
}

newLockedUntil, err := mgmt.RenewSessionLock(ctx, *sr.sessionID)

if err != nil {
return err
}

sr.lockedUntil = newLockedUntil
return nil
}

// init ensures the link was created, guaranteeing that we get our expected session lock.
func (sr *SessionReceiver) init(ctx context.Context) error {
// initialize the links
Expand Down
37 changes: 37 additions & 0 deletions sdk/messaging/azservicebus/session_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,43 @@ func TestSessionReceiver_nonSessionReceiver(t *testing.T) {
require.Contains(t, amqpError.Description, "It is not possible for an entity that requires sessions to create a non-sessionful message receiver.")
}

func TestSessionReceiver_RenewSessionLock(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, &QueueProperties{
RequiresSession: to.BoolPtr(true),
})
defer cleanup()

sessionReceiver, err := client.AcceptSessionForQueue(context.Background(), queueName, "session-1", nil)
require.NoError(t, err)

sender, err := client.NewSender(queueName)
require.NoError(t, err)

err = sender.SendMessage(context.Background(), &Message{
Body: []byte("hello world"),
SessionID: to.StringPtr("session-1"),
})
require.NoError(t, err)

messages, err := sessionReceiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)
require.NotNil(t, messages)

// surprisingly this works. Not sure what it accomplishes though. C# has a manual check for it.
// err = sessionReceiver.RenewMessageLock(context.Background(), messages[0])
// require.NoError(t, err)

orig := sessionReceiver.LockedUntil()
require.NoError(t, sessionReceiver.RenewSessionLock(context.Background()))
require.Greater(t, sessionReceiver.LockedUntil().UnixNano(), orig.UnixNano())

// bogus renewal
sessionReceiver.sessionID = to.StringPtr("bogus")

err = sessionReceiver.RenewSessionLock(context.Background())
require.Contains(t, err.Error(), "status code 410 and description: The session lock has expired on the MessageSession")
}

func Test_toReceiverOptions(t *testing.T) {
require.Nil(t, toReceiverOptions(nil))

Expand Down

0 comments on commit a79593a

Please sign in to comment.