From 358b7b8cd734754692d5834d905962ef08ccfa8b Mon Sep 17 00:00:00 2001 From: stewartboyd119 Date: Wed, 25 Sep 2024 08:55:30 -0700 Subject: [PATCH] Added `WithDisableCircuitBreaker` and `WithDisableBusyLoopBreaker` options. These are variants of the now deprecated `DisableCircuitBreaker` and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with code generation and for shimming with configuration. --- changelog.md | 6 ++++++ test/worker_test.go | 10 +++++----- work.go | 2 +- workoption.go | 27 +++++++++++++++++++++------ 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/changelog.md b/changelog.md index 46a8c8a..1936597 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. +## 1.3.0 (Sep 25, 2024) + +1. Added `WithDisableCircuitBreaker` and `WithDisableBusyLoopBreaker` options. These are variants of the now deprecated `DisableCircuitBreaker` +and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with +code generation and for shimming with configuration. + ## 1.2.0 (Sep 23, 2024) 1. Update to allow subject name specification (not just TopicNameStrategy) diff --git a/test/worker_test.go b/test/worker_test.go index a16292c..b32e096 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -203,6 +203,7 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) { zkafka.ConsumerTopicConfig{Topic: topicName}, kproc, zkafka.CircuitBreakAfter(1), // Circuit breaks after 1 error. + zkafka.WithDisableCircuitBreaker(false), zkafka.CircuitBreakFor(50*time.Millisecond), zkafka.WithOnDone(func(ctx context.Context, message *zkafka.Message, err error) { cnt.Add(1) @@ -443,7 +444,7 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) { w := kwf.Create( zkafka.ConsumerTopicConfig{Topic: topicName}, &fakeProcessor{}, - zkafka.DisableCircuitBreaker(), + zkafka.WithDisableCircuitBreaker(true), zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { cnt.Add(1) }}), @@ -966,7 +967,7 @@ func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *test }, }, &processor, - zkafka.DisableCircuitBreaker(), + zkafka.WithDisableCircuitBreaker(true), ) ctx, cancel := context.WithCancel(context.Background()) @@ -1564,7 +1565,6 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen kcp := zkafka_mocks.NewMockClientProvider(ctrl) kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Return(r, nil).AnyTimes() - //l := zkafka.NoopLogger{} l := stdLogger{includeDebug: true} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) @@ -1578,7 +1578,7 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen return errors.New("an error occurred during processing") }, }, - zkafka.DisableBusyLoopBreaker(), + zkafka.WithDisableBusyLoopBreaker(true), zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { fanOutCount.Add(1) }}), @@ -2132,7 +2132,7 @@ func BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker(b *testing.B) { zkafka.Speedup(10), zkafka.CircuitBreakAfter(100), zkafka.CircuitBreakFor(30*time.Millisecond), - zkafka.DisableBusyLoopBreaker(), + zkafka.WithDisableCircuitBreaker(true), ) ctx, cancel := context.WithTimeout(ctx, time.Second) diff --git a/work.go b/work.go index cf08523..492b9da 100644 --- a/work.go +++ b/work.go @@ -83,7 +83,7 @@ type Work struct { // Duration for which a circuit is open. Use CircuitBreakFor to control cbFor *time.Duration - // Disable circuit breaking. Use DisableCircuitBreaker to control + // Disable circuit breaking. Use WithDisableCircuitBreaker to control disableCb bool // Busy loop breaker. When circuit breaker circuit is open, instead of consuming cpu in a busy loop diff --git a/workoption.go b/workoption.go index 78b6674..fe94202 100644 --- a/workoption.go +++ b/workoption.go @@ -26,15 +26,26 @@ func CircuitBreakFor(duration time.Duration) WorkOption { return circuitBreakForOption{duration: duration} } -// DisableCircuitBreaker disables the circuit breaker so that it never breaks +// Deprecated: DisableCircuitBreaker disables the circuit breaker so that it never breaks func DisableCircuitBreaker() WorkOption { - return disableCbOption{} + return WithDisableCircuitBreaker(true) } -// DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. +// WithDisableCircuitBreaker allows the user to control whether circuit breaker is disabled or not +func WithDisableCircuitBreaker(isDisabled bool) WorkOption { + return disableCbOption{disabled: isDisabled} +} + +// Deprecated: DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. // Without blb we see increased cpu usage when circuit is open func DisableBusyLoopBreaker() WorkOption { - return disableBlbOption{} + return WithDisableBusyLoopBreaker(true) +} + +// WithDisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. +// Without blb we see increased cpu usage when circuit is open +func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption { + return disableBlbOption{disabled: isDisabled} } // WithOnDone allows you to specify a callback function executed after processing of a kafka message @@ -75,7 +86,9 @@ func (c circuitBreakForOption) apply(w *Work) { } } -type disableCbOption struct{} +type disableCbOption struct { + disabled bool +} func (d disableCbOption) apply(w *Work) { w.disableCb = true @@ -99,7 +112,9 @@ func (o lifeCycleOption) apply(w *Work) { w.lifecycle = o.lh } -type disableBlbOption struct{} +type disableBlbOption struct { + disabled bool +} func (d disableBlbOption) apply(w *Work) { w.blb.disabled = true