diff --git a/sdk/messaging/azservicebus/internal/stress/go-secret.yaml b/sdk/messaging/azservicebus/internal/stress/go-secret.yaml deleted file mode 100644 index f8f293b41b81..000000000000 Binary files a/sdk/messaging/azservicebus/internal/stress/go-secret.yaml and /dev/null differ diff --git a/sdk/messaging/azservicebus/internal/stress/job.yaml b/sdk/messaging/azservicebus/internal/stress/job.yaml deleted file mode 100644 index 8d204662b2ef..000000000000 --- a/sdk/messaging/azservicebus/internal/stress/job.yaml +++ /dev/null @@ -1,29 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: go-sb-stresstest - namespace: -spec: - backoffLimit: 4 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - env: - - name: SERVICEBUS_CONNECTION_STRING - valueFrom: - secretKeyRef: - key: SERVICEBUS_CONNECTION_STRING - name: go-secret - - name: APPINSIGHTS_INSTRUMENTATIONKEY - valueFrom: - secretKeyRef: - key: APPINSIGHTS_INSTRUMENTATIONKEY - name: go-secret - image: stresstestregistry.azurecr.io//gosbtest - imagePullPolicy: Always - command: - - "/stress" - name: main - restartPolicy: Never \ No newline at end of file diff --git a/sdk/messaging/azservicebus/internal/stress/sb-network-loss.yaml b/sdk/messaging/azservicebus/internal/stress/sb-network-loss.yaml deleted file mode 100644 index a48bbd5f4317..000000000000 --- a/sdk/messaging/azservicebus/internal/stress/sb-network-loss.yaml +++ /dev/null @@ -1,26 +0,0 @@ -{{- include "stress-test-addons.chaos-wrapper.tpl" (list . "stress.azservicebus-network") -}} -{{- define "stress.azservicebus-network" -}} -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -spec: - action: loss - direction: to - externalTargets: - # Maps to the service bus resource cname, provided the resource group name, provided - # the service bus namespace uses the resource group name as its name in the bicep template - - "{{ .Stress.ResourceGroupName }}.servicebus.windows.net" - mode: one - selector: - labelSelectors: - # Maps to the test pod, provided it also sets a testInstance label of {{ .Stress.ResourceGroupName }} - testInstance: infiniteSendAndReceive - chaos: "true" - namespaces: - - {{ .Release.Namespace }} - duration: 10s - scheduler: - cron: '*/3 * * * *' - loss: - loss: "100" - correlation: "100" -{{- end -}} diff --git a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml index a3edd5f651c6..bfca4a7af5f7 100644 --- a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml +++ b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml @@ -34,6 +34,12 @@ matrix: infiniteSendAndReceive: testTarget: infiniteSendAndReceive memory: "1.5Gi" + infiniteSendAndReceiveWithChaos: + testTarget: infiniteSendAndReceive + # this value is injected as a label value in templates/deploy-job.yaml + # this'll activate our standard chaos policy, which is at the bottom of that file. + chaos: "true" + memory: "1.5Gi" longRunningRenewLock: testTarget: longRunningRenewLock memory: "1.5Gi" diff --git a/sdk/messaging/azservicebus/internal/stress/shared/utils.go b/sdk/messaging/azservicebus/internal/stress/shared/utils.go index 6874de617083..10e3e8eaf56b 100644 --- a/sdk/messaging/azservicebus/internal/stress/shared/utils.go +++ b/sdk/messaging/azservicebus/internal/stress/shared/utils.go @@ -88,7 +88,7 @@ func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNa var topicOpts *admin.CreateTopicOptions - if options.Topic != nil { + if options != nil && options.Topic != nil { topicOpts = options.Topic } diff --git a/sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml b/sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml index 2ec01a41a97e..f35d6708afc6 100644 --- a/sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml +++ b/sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml @@ -2,7 +2,7 @@ {{- define "stress.deploy-example" -}} metadata: labels: - testName: "gosb" + chaos: "{{ default false .Stress.chaos }}" spec: # uncomment to deploy to the southeastasia region. # nodeSelector: @@ -22,7 +22,7 @@ spec: # just uses 'limits' for both. resources: limits: - memory: {{.Stress.memory}} + memory: {{.Stress.memory }} cpu: "1" args: - "tests" @@ -31,3 +31,37 @@ spec: {{- include "stress-test-addons.container-env" . | nindent 6 }} {{- end -}} +{{- include "stress-test-addons.chaos-wrapper.tpl" (list . "stress.network-chaos") -}} +{{- define "stress.network-chaos" -}} +# basically: every 5 minutes do 10s of network loss +kind: Schedule +apiVersion: chaos-mesh.org/v1alpha1 +spec: + schedule: "*/5 * * * *" + startingDeadlineSeconds: null + concurrencyPolicy: Forbid + historyLimit: 1 + type: NetworkChaos + networkChaos: + selector: + namespaces: + - "{{ .Release.Namespace }}" + labelSelectors: + scenario: {{ .Stress.Scenario }} + mode: all + action: loss + duration: 10s + loss: + loss: '100' + correlation: '100' + direction: to + target: + selector: + namespaces: + - {{ .Release.Namespace }} + labelSelectors: + scenario: {{ .Stress.Scenario }} + mode: all + externalTargets: + - {{ .Stress.BaseName }}.servicebus.windows.net +{{- end -}} diff --git a/sdk/messaging/azservicebus/internal/test/test_helpers.go b/sdk/messaging/azservicebus/internal/test/test_helpers.go index 033637c6dcdf..4a52234222a1 100644 --- a/sdk/messaging/azservicebus/internal/test/test_helpers.go +++ b/sdk/messaging/azservicebus/internal/test/test_helpers.go @@ -10,13 +10,13 @@ import ( "math/rand" "net/http" "os" - "sync" + "sync/atomic" "testing" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom" - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/stretchr/testify/require" ) @@ -25,6 +25,7 @@ var ( ) func init() { + addSwappableLogger() rand.Seed(time.Now().Unix()) } @@ -78,8 +79,7 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func( qd = &atom.QueueDescription{} } - deleteAfter := 5 * time.Minute - qd.AutoDeleteOnIdle = utils.DurationToStringPtr(&deleteAfter) + qd.AutoDeleteOnIdle = to.Ptr("PT5M") env := atom.WrapWithQueueEnvelope(qd, em.TokenProvider()) @@ -94,6 +94,23 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func( } } +var LoggingChannelValue atomic.Value + +func addSwappableLogger() { + azlog.SetListener(func(e azlog.Event, s string) { + ch, ok := LoggingChannelValue.Load().(*chan string) + + if !ok || ch == nil { + return + } + + select { + case *ch <- fmt.Sprintf("[%s] %s", e, s): + default: + } + }) +} + // CaptureLogsForTest adds a logging listener which captures messages to an // internal channel. // Returns a function that ends log capturing and returns any captured messages. @@ -107,14 +124,15 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func( // messages := endCapture() // /* do inspection of log messages */ func CaptureLogsForTest() func() []string { - messagesCh := make(chan string, 10000) - return CaptureLogsForTestWithChannel(messagesCh) + return CaptureLogsForTestWithChannel(nil) } func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string { - setAzLogListener(func(e azlog.Event, s string) { - messagesCh <- fmt.Sprintf("[%s] %s", e, s) - }) + if messagesCh == nil { + messagesCh = make(chan string, 10000) + } + + LoggingChannelValue.Store(&messagesCh) return func() []string { if messagesCh == nil { @@ -122,7 +140,6 @@ func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string { return nil } - setAzLogListener(nil) close(messagesCh) var messages []string @@ -137,16 +154,22 @@ func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string { } // EnableStdoutLogging turns on logging to stdout for diagnostics. -func EnableStdoutLogging() { - setAzLogListener(func(e azlog.Event, s string) { - log.Printf("%s %s", e, s) - }) -} +func EnableStdoutLogging() func() { + ch := make(chan string, 10000) + cleanupLogs := CaptureLogsForTestWithChannel(ch) -var logMu sync.Mutex + doneCh := make(chan struct{}) -func setAzLogListener(listener func(e azlog.Event, s string)) { - logMu.Lock() - defer logMu.Unlock() - azlog.SetListener(listener) + go func() { + defer close(doneCh) + + for msg := range ch { + log.Printf("%s", msg) + } + }() + + return func() { + _ = cleanupLogs() + <-doneCh + } } diff --git a/sdk/messaging/azservicebus/internal/utils/retrier_test.go b/sdk/messaging/azservicebus/internal/utils/retrier_test.go index 0d9752a03744..f9ba9a2a3546 100644 --- a/sdk/messaging/azservicebus/internal/utils/retrier_test.go +++ b/sdk/messaging/azservicebus/internal/utils/retrier_test.go @@ -15,6 +15,7 @@ import ( azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test" "github.com/stretchr/testify/require" ) @@ -290,16 +291,8 @@ func TestCalcDelay(t *testing.T) { } func TestRetryLogging(t *testing.T) { - var logs []string - - azlog.SetListener(func(e azlog.Event, s string) { - logs = append(logs, fmt.Sprintf("[%-10s] %s", e, s)) - }) - - defer azlog.SetListener(nil) - t.Run("normal error", func(t *testing.T) { - logs = nil + logsFn := test.CaptureLogsForTest() err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error { azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I) @@ -312,25 +305,40 @@ func TestRetryLogging(t *testing.T) { require.EqualError(t, err, "hello") require.Equal(t, []string{ - "[TestFunc ] Attempt 0, within test func, returning error hello", + "[TestFunc] Attempt 0, within test func, returning error hello", "[testLogEvent] (my_operation) Retry attempt 0 returned retryable error: hello", "[testLogEvent] (my_operation) Retry attempt 1 sleeping for