Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #201 from gavinfish/sender-retry
Browse files Browse the repository at this point in the history
Only retry with retryable amqp errors for sender
  • Loading branch information
catalinaperalta committed Jan 15, 2021
2 parents 705d239 + 9d82129 commit 78c960d
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 25 deletions.
17 changes: 17 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# Change Log

## `v0.10.8`
- only retry with retryable amqp errors for sender [#201](https://github.com/Azure/azure-service-bus-go/issues/201)

## `v0.10.7`
- add AzureEnvironment namespace option and use its definition [#192](https://github.com/Azure/azure-service-bus-go/issues/192)
- fix for Websocket behind Proxy Issue [#196](https://github.com/Azure/azure-service-bus-go/issues/196)
- fix nil error dereference [#199](https://github.com/Azure/azure-service-bus-go/issues/199)

## `v0.10.6`
- fix a hang when closing a receiver

## `v0.10.5`
- recover must rebuild the link atomically [#187](https://github.com/Azure/azure-service-bus-go/issues/187)

## `v0.10.4`
- updates dependencies to their latest versions

## `v0.10.3`
- Implements DefaultRuleDescription to allow setting a default rule for a subscription.

Expand Down
17 changes: 17 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,25 @@ package servicebus
import (
"fmt"
"reflect"
"time"

"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/go-amqp"
)

// Error Conditions
const (
// Service Bus Errors
errorServerBusy amqp.ErrorCondition = "com.microsoft:server-busy"
errorTimeout amqp.ErrorCondition = "com.microsoft:timeout"
errorOperationCancelled amqp.ErrorCondition = "com.microsoft:operation-cancelled"
errorContainerClose amqp.ErrorCondition = "com.microsoft:container-close"
)

const (
amqpRetryDefaultTimes int = 3
amqpRetryDefaultDelay time.Duration = time.Second
amqpRetryBusyServerDelay time.Duration = 10 * time.Second
)

type (
Expand Down
77 changes: 52 additions & 25 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package servicebus

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -234,31 +235,10 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {

switch err.(type) {
case *amqp.Error, *amqp.DetachError:
tab.For(ctx).Debug("recovering connection")
_, retryErr := common.Retry(10, 10*time.Second, func() (interface{}, error) {
ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover")
defer sp.End()

err := s.Recover(ctx)
if err == nil {
tab.For(ctx).Debug("recovered connection")
return nil, nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, common.Retryable(err.Error())
}
})

if retryErr != nil {
tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable")
if err := s.Close(ctx); err != nil {
tab.For(ctx).Error(err)
}
return retryErr
err = s.handleAMQPError(ctx, err)
if err != nil {
tab.For(ctx).Error(err)
return err
}
default:
tab.For(ctx).Error(err)
Expand All @@ -268,6 +248,53 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
}
}

// handleAMQPError is called internally when an event has failed to send so we
// can parse the error to determine whether we should attempt to retry sending the event again.
func (s *Sender) handleAMQPError(ctx context.Context, err error) error {
var amqpError *amqp.Error
if errors.As(err, &amqpError) {
switch amqpError.Condition {
case errorServerBusy:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryBusyServerDelay)
case errorTimeout:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
case errorOperationCancelled:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
case errorContainerClose:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
default:
return err
}
}
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
}

func (s *Sender) retryRetryableAmqpError(ctx context.Context, times int, delay time.Duration) error {
tab.For(ctx).Debug("recovering sender connection")
_, retryErr := common.Retry(times, delay, func() (interface{}, error) {
ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover")
defer sp.End()

err := s.Recover(ctx)
if err == nil {
tab.For(ctx).Debug("recovered connection")
return nil, nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, common.Retryable(err.Error())
}
})
if retryErr != nil {
tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable")
return retryErr
}
return nil
}

func (s *Sender) connClosedError(ctx context.Context) error {
name := "Sender"
if s.Name != "" {
Expand Down

0 comments on commit 78c960d

Please sign in to comment.