Skip to content

Commit

Permalink
Add associate-link-name property to RenewLocks function. (Azure#225)
Browse files Browse the repository at this point in the history
* Add associated-link-name property to RenewLocks message. Fix connection idle timeout for messages with >=10minsprocessing time.

* Upgrade go-amqp

* Make message.getLinkName package level function

* only set associated link name if available

Co-authored-by: Robert Zakrzewski <Robert.Zakrzewski@tomtom.com>
Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
  • Loading branch information
3 people authored May 6, 2021
1 parent 73da336 commit 319bf88
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/Azure/azure-amqp-common-go/v3 v3.1.0
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
github.com/Azure/go-amqp v0.13.6
github.com/Azure/go-amqp v0.13.7
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/Azure/go-autorest/autorest/date v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGib
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
github.com/Azure/go-amqp v0.13.6 h1:CWjyY59Iyc1sO/fE/AubMLMWf5id+Uiw/ph0bZzG9Ns=
github.com/Azure/go-amqp v0.13.6/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI=
github.com/Azure/go-amqp v0.13.7 h1:ukcCtx138ZmOfHbdALuh9yoJhGtOY3+yaKApfzNvhSk=
github.com/Azure/go-amqp v0.13.7/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws=
Expand Down
5 changes: 5 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func NewMessage(data []byte) *Message {
}
}

// getLinkName returns associated link name or empty string if receiver or link is not defined,
func (m *Message) getLinkName() string {
return m.message.GetLinkName()
}

// CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the
// queue
func (m *Message) CompleteAction() DispositionAction {
Expand Down
1 change: 1 addition & 0 deletions operation_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ const (
operationFieldName = "operation"
lockTokensFieldName = "lock-tokens"
serverTimeoutFieldName = vendorPrefix + "server-timeout"
associatedLinkName = "associated-link-name"
)
7 changes: 7 additions & 0 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error
ctx, span := startConsumerSpanFromContext(ctx, "sb.RenewLocks")
defer span.End()

var linkName string
lockTokens := make([]amqp.UUID, 0, len(messages))
for _, m := range messages {
if m.LockToken == nil {
Expand All @@ -405,6 +406,9 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error

amqpLockToken := amqp.UUID(*m.LockToken)
lockTokens = append(lockTokens, amqpLockToken)
if linkName == "" {
linkName = m.getLinkName()
}
}

if len(lockTokens) < 1 {
Expand All @@ -420,6 +424,9 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error
lockTokensFieldName: lockTokens,
},
}
if linkName != "" {
renewRequestMsg.ApplicationProperties[associatedLinkName] = linkName
}

response, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), renewRequestMsg, 3, 1*time.Second)
if err != nil {
Expand Down

0 comments on commit 319bf88

Please sign in to comment.