diff --git a/go.mod b/go.mod index 900390fce97b..8dd9623cffd4 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/Azure/azure-amqp-common-go/v3 v3.1.0 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible - github.com/Azure/go-amqp v0.13.6 + github.com/Azure/go-amqp v0.13.7 github.com/Azure/go-autorest/autorest v0.11.18 github.com/Azure/go-autorest/autorest/adal v0.9.13 github.com/Azure/go-autorest/autorest/date v0.3.0 diff --git a/go.sum b/go.sum index 2cd0ecc7f510..9306fe6ccb45 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGib github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs= -github.com/Azure/go-amqp v0.13.6 h1:CWjyY59Iyc1sO/fE/AubMLMWf5id+Uiw/ph0bZzG9Ns= -github.com/Azure/go-amqp v0.13.6/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI= +github.com/Azure/go-amqp v0.13.7 h1:ukcCtx138ZmOfHbdALuh9yoJhGtOY3+yaKApfzNvhSk= +github.com/Azure/go-amqp v0.13.7/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws= diff --git a/message.go b/message.go index d9cff0fb57e8..e4328a70397a 100644 --- a/message.go +++ b/message.go @@ -131,6 +131,11 @@ func NewMessage(data []byte) *Message { } } +// getLinkName returns associated link name or empty string if receiver or link is not defined, +func (m *Message) getLinkName() string { + return m.message.GetLinkName() +} + // CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the // queue func (m *Message) CompleteAction() DispositionAction { diff --git a/operation_constants.go b/operation_constants.go index d50b4c475ca0..4ee48d8b0393 100644 --- a/operation_constants.go +++ b/operation_constants.go @@ -15,4 +15,5 @@ const ( operationFieldName = "operation" lockTokensFieldName = "lock-tokens" serverTimeoutFieldName = vendorPrefix + "server-timeout" + associatedLinkName = "associated-link-name" ) diff --git a/rpc.go b/rpc.go index 1758cfb850d2..d9df102a01aa 100644 --- a/rpc.go +++ b/rpc.go @@ -396,6 +396,7 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error ctx, span := startConsumerSpanFromContext(ctx, "sb.RenewLocks") defer span.End() + var linkName string lockTokens := make([]amqp.UUID, 0, len(messages)) for _, m := range messages { if m.LockToken == nil { @@ -405,6 +406,9 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error amqpLockToken := amqp.UUID(*m.LockToken) lockTokens = append(lockTokens, amqpLockToken) + if linkName == "" { + linkName = m.getLinkName() + } } if len(lockTokens) < 1 { @@ -420,6 +424,9 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error lockTokensFieldName: lockTokens, }, } + if linkName != "" { + renewRequestMsg.ApplicationProperties[associatedLinkName] = linkName + } response, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), renewRequestMsg, 3, 1*time.Second) if err != nil {