From 9d82129c2e9dcd644dac04b3e0721b85ba23bdbe Mon Sep 17 00:00:00 2001 From: drfish Date: Mon, 21 Dec 2020 22:08:42 +0800 Subject: [PATCH] Only retry with retryable amqp errors for sender --- changelog.md | 17 ++++++++++++ errors.go | 17 ++++++++++++ sender.go | 77 +++++++++++++++++++++++++++++++++++----------------- 3 files changed, 86 insertions(+), 25 deletions(-) diff --git a/changelog.md b/changelog.md index 4c31323..25610ae 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,22 @@ # Change Log +## `v0.10.8` +- only retry with retryable amqp errors for sender [#201](https://github.com/Azure/azure-service-bus-go/issues/201) + +## `v0.10.7` +- add AzureEnvironment namespace option and use its definition [#192](https://github.com/Azure/azure-service-bus-go/issues/192) +- fix for Websocket behind Proxy Issue [#196](https://github.com/Azure/azure-service-bus-go/issues/196) +- fix nil error dereference [#199](https://github.com/Azure/azure-service-bus-go/issues/199) + +## `v0.10.6` +- fix a hang when closing a receiver + +## `v0.10.5` +- recover must rebuild the link atomically [#187](https://github.com/Azure/azure-service-bus-go/issues/187) + +## `v0.10.4` +- updates dependencies to their latest versions + ## `v0.10.3` - Implements DefaultRuleDescription to allow setting a default rule for a subscription. diff --git a/errors.go b/errors.go index 59f35a8..79addaf 100644 --- a/errors.go +++ b/errors.go @@ -3,8 +3,25 @@ package servicebus import ( "fmt" "reflect" + "time" "github.com/Azure/azure-amqp-common-go/v3/rpc" + "github.com/Azure/go-amqp" +) + +// Error Conditions +const ( + // Service Bus Errors + errorServerBusy amqp.ErrorCondition = "com.microsoft:server-busy" + errorTimeout amqp.ErrorCondition = "com.microsoft:timeout" + errorOperationCancelled amqp.ErrorCondition = "com.microsoft:operation-cancelled" + errorContainerClose amqp.ErrorCondition = "com.microsoft:container-close" +) + +const ( + amqpRetryDefaultTimes int = 3 + amqpRetryDefaultDelay time.Duration = time.Second + amqpRetryBusyServerDelay time.Duration = 10 * time.Second ) type ( diff --git a/sender.go b/sender.go index a886fdd..14debc5 100644 --- a/sender.go +++ b/sender.go @@ -24,6 +24,7 @@ package servicebus import ( "context" + "errors" "sync" "time" @@ -234,31 +235,10 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error { switch err.(type) { case *amqp.Error, *amqp.DetachError: - tab.For(ctx).Debug("recovering connection") - _, retryErr := common.Retry(10, 10*time.Second, func() (interface{}, error) { - ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover") - defer sp.End() - - err := s.Recover(ctx) - if err == nil { - tab.For(ctx).Debug("recovered connection") - return nil, nil - } - - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - return nil, common.Retryable(err.Error()) - } - }) - - if retryErr != nil { - tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable") - if err := s.Close(ctx); err != nil { - tab.For(ctx).Error(err) - } - return retryErr + err = s.handleAMQPError(ctx, err) + if err != nil { + tab.For(ctx).Error(err) + return err } default: tab.For(ctx).Error(err) @@ -268,6 +248,53 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error { } } +// handleAMQPError is called internally when an event has failed to send so we +// can parse the error to determine whether we should attempt to retry sending the event again. +func (s *Sender) handleAMQPError(ctx context.Context, err error) error { + var amqpError *amqp.Error + if errors.As(err, &amqpError) { + switch amqpError.Condition { + case errorServerBusy: + return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryBusyServerDelay) + case errorTimeout: + return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay) + case errorOperationCancelled: + return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay) + case errorContainerClose: + return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay) + default: + return err + } + } + return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay) +} + +func (s *Sender) retryRetryableAmqpError(ctx context.Context, times int, delay time.Duration) error { + tab.For(ctx).Debug("recovering sender connection") + _, retryErr := common.Retry(times, delay, func() (interface{}, error) { + ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover") + defer sp.End() + + err := s.Recover(ctx) + if err == nil { + tab.For(ctx).Debug("recovered connection") + return nil, nil + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + return nil, common.Retryable(err.Error()) + } + }) + if retryErr != nil { + tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable") + return retryErr + } + return nil +} + func (s *Sender) connClosedError(ctx context.Context) error { name := "Sender" if s.Name != "" {