Skip to content

Commit

Permalink
[azservicebus] Stress test improvements (Azure#19668)
Browse files Browse the repository at this point in the history
* Stress test improvements:
- Fixing the panic that happens if you're not using a topic!
- Adding in a chaos run for infiniteSendAndReceive that can be used as an example for others.

Also, fixing some issues in CI that cropped up around areas that probably needed some garbage cleanup:
* Fixing the inconsistent logging and concurrency issues with the old style. Just keep the function in place, swap out channels, etc...
Also, fix the circular references between test and utils.
* Eliminate the non-determinism in a message releaser test by only returning a single element, and then blocking until cancel (more realistic behavior)
  • Loading branch information
richardpark-msft authored Dec 8, 2022
1 parent f8f2c90 commit 94de68c
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 121 deletions.
Binary file not shown.
29 changes: 0 additions & 29 deletions sdk/messaging/azservicebus/internal/stress/job.yaml

This file was deleted.

26 changes: 0 additions & 26 deletions sdk/messaging/azservicebus/internal/stress/sb-network-loss.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ matrix:
infiniteSendAndReceive:
testTarget: infiniteSendAndReceive
memory: "1.5Gi"
infiniteSendAndReceiveWithChaos:
testTarget: infiniteSendAndReceive
# this value is injected as a label value in templates/deploy-job.yaml
# this'll activate our standard chaos policy, which is at the bottom of that file.
chaos: "true"
memory: "1.5Gi"
longRunningRenewLock:
testTarget: longRunningRenewLock
memory: "1.5Gi"
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/stress/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNa

var topicOpts *admin.CreateTopicOptions

if options.Topic != nil {
if options != nil && options.Topic != nil {
topicOpts = options.Topic
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{{- define "stress.deploy-example" -}}
metadata:
labels:
testName: "gosb"
chaos: "{{ default false .Stress.chaos }}"
spec:
# uncomment to deploy to the southeastasia region.
# nodeSelector:
Expand All @@ -22,7 +22,7 @@ spec:
# just uses 'limits' for both.
resources:
limits:
memory: {{.Stress.memory}}
memory: {{.Stress.memory }}
cpu: "1"
args:
- "tests"
Expand All @@ -31,3 +31,37 @@ spec:
{{- include "stress-test-addons.container-env" . | nindent 6 }}
{{- end -}}

{{- include "stress-test-addons.chaos-wrapper.tpl" (list . "stress.network-chaos") -}}
{{- define "stress.network-chaos" -}}
# basically: every 5 minutes do 10s of network loss
kind: Schedule
apiVersion: chaos-mesh.org/v1alpha1
spec:
schedule: "*/5 * * * *"
startingDeadlineSeconds: null
concurrencyPolicy: Forbid
historyLimit: 1
type: NetworkChaos
networkChaos:
selector:
namespaces:
- "{{ .Release.Namespace }}"
labelSelectors:
scenario: {{ .Stress.Scenario }}
mode: all
action: loss
duration: 10s
loss:
loss: '100'
correlation: '100'
direction: to
target:
selector:
namespaces:
- {{ .Release.Namespace }}
labelSelectors:
scenario: {{ .Stress.Scenario }}
mode: all
externalTargets:
- {{ .Stress.BaseName }}.servicebus.windows.net
{{- end -}}
63 changes: 43 additions & 20 deletions sdk/messaging/azservicebus/internal/test/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"math/rand"
"net/http"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
"github.com/stretchr/testify/require"
)

Expand All @@ -25,6 +25,7 @@ var (
)

func init() {
addSwappableLogger()
rand.Seed(time.Now().Unix())
}

Expand Down Expand Up @@ -78,8 +79,7 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func(
qd = &atom.QueueDescription{}
}

deleteAfter := 5 * time.Minute
qd.AutoDeleteOnIdle = utils.DurationToStringPtr(&deleteAfter)
qd.AutoDeleteOnIdle = to.Ptr("PT5M")

env := atom.WrapWithQueueEnvelope(qd, em.TokenProvider())

Expand All @@ -94,6 +94,23 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func(
}
}

var LoggingChannelValue atomic.Value

func addSwappableLogger() {
azlog.SetListener(func(e azlog.Event, s string) {
ch, ok := LoggingChannelValue.Load().(*chan string)

if !ok || ch == nil {
return
}

select {
case *ch <- fmt.Sprintf("[%s] %s", e, s):
default:
}
})
}

// CaptureLogsForTest adds a logging listener which captures messages to an
// internal channel.
// Returns a function that ends log capturing and returns any captured messages.
Expand All @@ -107,22 +124,22 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func(
// messages := endCapture()
// /* do inspection of log messages */
func CaptureLogsForTest() func() []string {
messagesCh := make(chan string, 10000)
return CaptureLogsForTestWithChannel(messagesCh)
return CaptureLogsForTestWithChannel(nil)
}

func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string {
setAzLogListener(func(e azlog.Event, s string) {
messagesCh <- fmt.Sprintf("[%s] %s", e, s)
})
if messagesCh == nil {
messagesCh = make(chan string, 10000)
}

LoggingChannelValue.Store(&messagesCh)

return func() []string {
if messagesCh == nil {
// already been closed, probably manually.
return nil
}

setAzLogListener(nil)
close(messagesCh)

var messages []string
Expand All @@ -137,16 +154,22 @@ func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string {
}

// EnableStdoutLogging turns on logging to stdout for diagnostics.
func EnableStdoutLogging() {
setAzLogListener(func(e azlog.Event, s string) {
log.Printf("%s %s", e, s)
})
}
func EnableStdoutLogging() func() {
ch := make(chan string, 10000)
cleanupLogs := CaptureLogsForTestWithChannel(ch)

var logMu sync.Mutex
doneCh := make(chan struct{})

func setAzLogListener(listener func(e azlog.Event, s string)) {
logMu.Lock()
defer logMu.Unlock()
azlog.SetListener(listener)
go func() {
defer close(doneCh)

for msg := range ch {
log.Printf("%s", msg)
}
}()

return func() {
_ = cleanupLogs()
<-doneCh
}
}
61 changes: 34 additions & 27 deletions sdk/messaging/azservicebus/internal/utils/retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -290,16 +291,8 @@ func TestCalcDelay(t *testing.T) {
}

func TestRetryLogging(t *testing.T) {
var logs []string

azlog.SetListener(func(e azlog.Event, s string) {
logs = append(logs, fmt.Sprintf("[%-10s] %s", e, s))
})

defer azlog.SetListener(nil)

t.Run("normal error", func(t *testing.T) {
logs = nil
logsFn := test.CaptureLogsForTest()

err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error {
azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I)
Expand All @@ -312,25 +305,40 @@ func TestRetryLogging(t *testing.T) {
require.EqualError(t, err, "hello")

require.Equal(t, []string{
"[TestFunc ] Attempt 0, within test func, returning error hello",
"[TestFunc] Attempt 0, within test func, returning error hello",
"[testLogEvent] (my_operation) Retry attempt 0 returned retryable error: hello",

"[testLogEvent] (my_operation) Retry attempt 1 sleeping for <time elided>",
"[TestFunc ] Attempt 1, within test func, returning error hello",
"[TestFunc] Attempt 1, within test func, returning error hello",
"[testLogEvent] (my_operation) Retry attempt 1 returned retryable error: hello",

"[testLogEvent] (my_operation) Retry attempt 2 sleeping for <time elided>",
"[TestFunc ] Attempt 2, within test func, returning error hello",
"[TestFunc] Attempt 2, within test func, returning error hello",
"[testLogEvent] (my_operation) Retry attempt 2 returned retryable error: hello",

"[testLogEvent] (my_operation) Retry attempt 3 sleeping for <time elided>",
"[TestFunc ] Attempt 3, within test func, returning error hello",
"[TestFunc] Attempt 3, within test func, returning error hello",
"[testLogEvent] (my_operation) Retry attempt 3 returned retryable error: hello",
}, normalizeRetryLogLines(logs))
}, normalizeRetryLogLines(logsFn()))
})

t.Run("normal error2", func(t *testing.T) {
cleanup := test.EnableStdoutLogging()
defer cleanup()

err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error {
azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I)
return errors.New("hello")
}, func(err error) bool {
return false
}, exported.RetryOptions{
RetryDelay: time.Microsecond,
})
require.EqualError(t, err, "hello")
})

t.Run("cancellation error", func(t *testing.T) {
logs = nil
logsFn := test.CaptureLogsForTest()

err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error {
azlog.Writef("TestFunc",
Expand All @@ -344,13 +352,13 @@ func TestRetryLogging(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)

require.Equal(t, []string{
"[TestFunc ] Attempt 0, within test func",
"[TestFunc] Attempt 0, within test func",
"[testLogEvent] (test_operation) Retry attempt 0 was cancelled, stopping: context canceled",
}, normalizeRetryLogLines(logs))
}, normalizeRetryLogLines(logsFn()))
})

t.Run("custom fatal error", func(t *testing.T) {
logs = nil
logsFn := test.CaptureLogsForTest()

err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error {
azlog.Writef("TestFunc",
Expand All @@ -364,14 +372,13 @@ func TestRetryLogging(t *testing.T) {
require.EqualError(t, err, "custom fatal error")

require.Equal(t, []string{
"[TestFunc ] Attempt 0, within test func",
"[TestFunc] Attempt 0, within test func",
"[testLogEvent] (test_operation) Retry attempt 0 returned non-retryable error: custom fatal error",
}, normalizeRetryLogLines(logs))
}, normalizeRetryLogLines(logsFn()))
})

t.Run("with reset attempts", func(t *testing.T) {
logs = nil

logsFn := test.CaptureLogsForTest()
reset := false

err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error {
Expand Down Expand Up @@ -399,13 +406,13 @@ func TestRetryLogging(t *testing.T) {
require.Nil(t, err)

require.Equal(t, []string{
"[TestFunc ] Attempt 0, within test func",
"[TestFunc ] Attempt 0, resetting",
"[TestFunc] Attempt 0, within test func",
"[TestFunc] Attempt 0, resetting",
"[testLogEvent] (test_operation) Resetting retry attempts",
"[testLogEvent] (test_operation) Retry attempt -1 returned retryable error: link detached, reason: *Error(nil)",
"[TestFunc ] Attempt 0, within test func",
"[TestFunc ] Attempt 0, return nil",
}, normalizeRetryLogLines(logs))
"[TestFunc] Attempt 0, within test func",
"[TestFunc] Attempt 0, return nil",
}, normalizeRetryLogLines(logsFn()))
})
}

Expand Down
5 changes: 2 additions & 3 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,8 @@ func TestReceiverPeek(t *testing.T) {

func TestReceiverDetachWithPeekLock(t *testing.T) {
// NOTE: uncomment this to see some of the background reconnects
// azlog.SetListener(func(e azlog.Event, s string) {
// log.Printf("%s %s", e, s)
// })
// stopFn := test.EnableStdoutLogging()
// defer stopFn()

serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()
Expand Down
Loading

0 comments on commit 94de68c

Please sign in to comment.