Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Allow for renewing message and session locks #15929

Merged
merged 5 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
enough to fit into a single batch.
- Receiving from sessions using a SessionReceiver, created using Client.AcceptSessionFor(Queue|Subscription)
or Client.AcceptNextSessionFor(Queue|Subscription).
- Can now renew a message lock for a ReceivedMessage using Receiver.RenewMessageLock()
- Can now renew a session lock for a SessionReceiver using SessionReceiver.RenewSessionLock()

### Breaking Changes

Expand Down
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (client *Client) NewSender(queueOrTopic string) (*Sender, error) {
func (client *Client) AcceptSessionForQueue(ctx context.Context, queue string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
&sessionID,
client.namespace,
&entity{Queue: queue},
Expand All @@ -187,6 +188,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queue string, s
func (client *Client) AcceptSessionForSubscription(ctx context.Context, topic string, subscription string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
&sessionID,
client.namespace,
&entity{Topic: topic, Subscription: subscription},
Expand All @@ -210,6 +212,7 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topic st
func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queue string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
nil,
client.namespace,
&entity{Queue: queue},
Expand All @@ -233,6 +236,7 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queue strin
func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topic string, subscription string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
nil,
client.namespace,
&entity{Topic: topic, Subscription: subscription},
Expand Down
70 changes: 62 additions & 8 deletions sdk/messaging/azservicebus/internal/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type MgmtClient interface {

ScheduleMessages(ctx context.Context, enqueueTime time.Time, messages ...*amqp.Message) ([]int64, error)
CancelScheduled(ctx context.Context, seq ...int64) error

RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error)
}

func newMgmtClient(ctx context.Context, links AMQPLinks, ns NamespaceForMgmtClient) (MgmtClient, error) {
Expand Down Expand Up @@ -391,14 +394,10 @@ func (mc *mgmtClient) PeekMessages(ctx context.Context, fromSequenceNumber int64

// RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation.
// NOTE: this function assumes all the messages received on the same link.
func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockTokens ...*amqp.UUID) (err error) {
func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error) {
ctx, span := tracing.StartConsumerSpanFromContext(ctx, tracing.SpanRenewLock, Version)
defer span.End()

if len(lockTokens) == 0 {
return nil
}

renewRequestMsg := &amqp.Message{
ApplicationProperties: map[string]interface{}{
"operation": "com.microsoft:renew-lock",
Expand All @@ -413,18 +412,73 @@ func (mc *mgmtClient) RenewLocks(ctx context.Context, linkName string, lockToken
}

response, err := mc.doRPCWithRetry(ctx, renewRequestMsg, 3, 1*time.Second)

if err != nil {
tab.For(ctx).Error(err)
return err
return nil, err
}

if response.Code != 200 {
err := fmt.Errorf("error renewing locks: %v", response.Description)
tab.For(ctx).Error(err)
return err
return nil, err
}

return nil
// extract the new lock renewal times from the response
// response.Message.

val, ok := response.Message.Value.(map[string]interface{})
if !ok {
return nil, NewErrIncorrectType("Message.Value", map[string]interface{}{}, response.Message.Value)
}

expirations, ok := val["expirations"]

if !ok {
return nil, NewErrIncorrectType("Message.Value[\"expirations\"]", map[string]interface{}{}, response.Message.Value)
}

asTimes, ok := expirations.([]time.Time)

if !ok {
return nil, NewErrIncorrectType("Message.Value[\"expirations\"] as times", map[string]interface{}{}, response.Message.Value)
}

return asTimes, nil
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
}

// RenewSessionLocks renews a session lock.
func (mc *mgmtClient) RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error) {
body := map[string]interface{}{
"session-id": sessionID,
}

msg := &amqp.Message{
Value: body,
ApplicationProperties: map[string]interface{}{
"operation": "com.microsoft:renew-session-lock",
},
}

resp, err := mc.doRPCWithRetry(ctx, msg, 5, 5*time.Second)

if err != nil {
return time.Time{}, err
}

m, ok := resp.Message.Value.(map[string]interface{})

if !ok {
return time.Time{}, NewErrIncorrectType("Message.Value", map[string]interface{}{}, resp.Message.Value)
}

lockedUntil, ok := m["expiration"].(time.Time)

if !ok {
return time.Time{}, NewErrIncorrectType("Message.Value[\"expiration\"] as times", time.Time{}, resp.Message.Value)
}

return lockedUntil, nil
}

// SendDisposition allows you settle a message using the management link, rather than via your
Expand Down
20 changes: 20 additions & 0 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,26 @@ func (r *Receiver) receiveMessage(ctx context.Context, options *ReceiveOptions)
return messages[0], nil
}

// RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`.
func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error {
_, _, mgmt, _, err := r.amqpLinks.Get(ctx)

if err != nil {
return err
}

newExpirationTime, err := mgmt.RenewLocks(ctx, msg.rawAMQPMessage.LinkName(), []amqp.UUID{
(amqp.UUID)(msg.LockToken),
})

if err != nil {
return err
}

msg.LockedUntil = &newExpirationTime[0]
return nil
}

// Close permanently closes the receiver.
func (r *Receiver) Close(ctx context.Context) error {
r.cleanupOnClose()
Expand Down
39 changes: 39 additions & 0 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,45 @@ func TestReceiverPeek(t *testing.T) {
require.Empty(t, noMessagesExpected)
}

func TestReceiver_RenewMessageLock(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()

sender, err := client.NewSender(queueName)
require.NoError(t, err)

err = sender.SendMessage(context.Background(), &Message{
Body: []byte("hello world"),
})
require.NoError(t, err)

receiver, err := client.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)

messages, err := receiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)

time.Sleep(2 * time.Second)
lockedUntilOld := messages[0].LockedUntil
require.NoError(t, receiver.RenewMessageLock(context.Background(), messages[0]))

// these should hopefully be unaffected by clock drift since both values come from
// the service's times, not ours.
require.Greater(t, messages[0].LockedUntil.UnixNano(), lockedUntilOld.UnixNano())

// try renewing a bogus token
for i := 0; i < len(messages[0].LockToken); i++ {
messages[0].LockToken[i] = 0
}

expectedLockBadError := receiver.RenewMessageLock(context.Background(), messages[0])
// String matching can go away once we fix #15644
// For now it at least provides the user with good context that something is incorrect about their lock token.
require.Contains(t, expectedLockBadError.Error(),
"status code 410 and description: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue",
"error message from SB comes through")
}

func TestReceiverOptions(t *testing.T) {
// defaults
receiver := &Receiver{}
Expand Down
41 changes: 38 additions & 3 deletions sdk/messaging/azservicebus/session_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azservicebus
import (
"context"
"fmt"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
"github.com/Azure/go-amqp"
Expand All @@ -15,7 +16,8 @@ import (
type SessionReceiver struct {
*Receiver

sessionID *string
sessionID *string
lockedUntil time.Time
}

// SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription`
Expand Down Expand Up @@ -46,12 +48,13 @@ func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions {
}
}

func newSessionReceiver(sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) {
func newSessionReceiver(ctx context.Context, sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) {
const sessionFilterName = "com.microsoft:session-filter"
const code = uint64(0x00000137000000C)

sessionReceiver := &SessionReceiver{
sessionID: sessionID,
sessionID: sessionID,
lockedUntil: time.Time{},
}

var err error
Expand Down Expand Up @@ -89,15 +92,47 @@ func newSessionReceiver(sessionID *string, ns internal.NamespaceWithNewAMQPLinks
return nil, err
}

// temp workaround until we expose the session expiration time from the receiver in go-amqp
if err := sessionReceiver.RenewSessionLock(ctx); err != nil {
_ = sessionReceiver.Close(context.Background())
return nil, err
}

return sessionReceiver, nil
}

// SessionID is the session ID for this SessionReceiver.
func (sr *SessionReceiver) SessionID() string {
// return the ultimately assigned session ID for this link (anonymous will get it from the
// link filter options, non-anonymous is set in newSessionReceiver)
return *sr.sessionID
}

// LockedUntil is the time the lock on this session expires.
// The lock can be renewed using `SessionReceiver.RenewSessionLock`.
func (sr *SessionReceiver) LockedUntil() time.Time {
return sr.lockedUntil
}

// RenewSessionLock renews this session's lock. The new expiration time is available
// using `LockedUntil`.
func (sr *SessionReceiver) RenewSessionLock(ctx context.Context) error {
_, _, mgmt, _, err := sr.amqpLinks.Get(ctx)

if err != nil {
return err
}

newLockedUntil, err := mgmt.RenewSessionLock(ctx, *sr.sessionID)

if err != nil {
return err
}

sr.lockedUntil = newLockedUntil
return nil
}

// init ensures the link was created, guaranteeing that we get our expected session lock.
func (sr *SessionReceiver) init(ctx context.Context) error {
// initialize the links
Expand Down
37 changes: 37 additions & 0 deletions sdk/messaging/azservicebus/session_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,43 @@ func TestSessionReceiver_nonSessionReceiver(t *testing.T) {
require.Contains(t, amqpError.Description, "It is not possible for an entity that requires sessions to create a non-sessionful message receiver.")
}

func TestSessionReceiver_RenewSessionLock(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, &QueueProperties{
RequiresSession: to.BoolPtr(true),
})
defer cleanup()

sessionReceiver, err := client.AcceptSessionForQueue(context.Background(), queueName, "session-1", nil)
require.NoError(t, err)

sender, err := client.NewSender(queueName)
require.NoError(t, err)

err = sender.SendMessage(context.Background(), &Message{
Body: []byte("hello world"),
SessionID: to.StringPtr("session-1"),
})
require.NoError(t, err)

messages, err := sessionReceiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)
require.NotNil(t, messages)

// surprisingly this works. Not sure what it accomplishes though. C# has a manual check for it.
// err = sessionReceiver.RenewMessageLock(context.Background(), messages[0])
// require.NoError(t, err)
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

orig := sessionReceiver.LockedUntil()
require.NoError(t, sessionReceiver.RenewSessionLock(context.Background()))
require.Greater(t, sessionReceiver.LockedUntil().UnixNano(), orig.UnixNano())

// bogus renewal
sessionReceiver.sessionID = to.StringPtr("bogus")

err = sessionReceiver.RenewSessionLock(context.Background())
require.Contains(t, err.Error(), "status code 410 and description: The session lock has expired on the MessageSession")
}

func Test_toReceiverOptions(t *testing.T) {
require.Nil(t, toReceiverOptions(nil))

Expand Down