Skip to content

Commit

Permalink
Added WithDisableCircuitBreaker and WithDisableBusyLoopBreaker op…
Browse files Browse the repository at this point in the history
…tions. These are variants of the now deprecated `DisableCircuitBreaker`

and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with
code generation and for shimming with configuration.
  • Loading branch information
stewartboyd119 committed Sep 25, 2024
1 parent ef8e6bc commit 358b7b8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.3.0 (Sep 25, 2024)

1. Added `WithDisableCircuitBreaker` and `WithDisableBusyLoopBreaker` options. These are variants of the now deprecated `DisableCircuitBreaker`
and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with
code generation and for shimming with configuration.

## 1.2.0 (Sep 23, 2024)

1. Update to allow subject name specification (not just TopicNameStrategy)
Expand Down
10 changes: 5 additions & 5 deletions test/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) {
zkafka.ConsumerTopicConfig{Topic: topicName},
kproc,
zkafka.CircuitBreakAfter(1), // Circuit breaks after 1 error.
zkafka.WithDisableCircuitBreaker(false),
zkafka.CircuitBreakFor(50*time.Millisecond),
zkafka.WithOnDone(func(ctx context.Context, message *zkafka.Message, err error) {
cnt.Add(1)
Expand Down Expand Up @@ -443,7 +444,7 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) {
w := kwf.Create(
zkafka.ConsumerTopicConfig{Topic: topicName},
&fakeProcessor{},
zkafka.DisableCircuitBreaker(),
zkafka.WithDisableCircuitBreaker(true),
zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) {
cnt.Add(1)
}}),
Expand Down Expand Up @@ -966,7 +967,7 @@ func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *test
},
},
&processor,
zkafka.DisableCircuitBreaker(),
zkafka.WithDisableCircuitBreaker(true),
)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1564,7 +1565,6 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen
kcp := zkafka_mocks.NewMockClientProvider(ctrl)
kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Return(r, nil).AnyTimes()

//l := zkafka.NoopLogger{}
l := stdLogger{includeDebug: true}
kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l))

Expand All @@ -1578,7 +1578,7 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen
return errors.New("an error occurred during processing")
},
},
zkafka.DisableBusyLoopBreaker(),
zkafka.WithDisableBusyLoopBreaker(true),
zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) {
fanOutCount.Add(1)
}}),
Expand Down Expand Up @@ -2132,7 +2132,7 @@ func BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker(b *testing.B) {
zkafka.Speedup(10),
zkafka.CircuitBreakAfter(100),
zkafka.CircuitBreakFor(30*time.Millisecond),
zkafka.DisableBusyLoopBreaker(),
zkafka.WithDisableCircuitBreaker(true),
)

ctx, cancel := context.WithTimeout(ctx, time.Second)
Expand Down
2 changes: 1 addition & 1 deletion work.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Work struct {
// Duration for which a circuit is open. Use CircuitBreakFor to control
cbFor *time.Duration

// Disable circuit breaking. Use DisableCircuitBreaker to control
// Disable circuit breaking. Use WithDisableCircuitBreaker to control
disableCb bool

// Busy loop breaker. When circuit breaker circuit is open, instead of consuming cpu in a busy loop
Expand Down
27 changes: 21 additions & 6 deletions workoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ func CircuitBreakFor(duration time.Duration) WorkOption {
return circuitBreakForOption{duration: duration}
}

// DisableCircuitBreaker disables the circuit breaker so that it never breaks
// Deprecated: DisableCircuitBreaker disables the circuit breaker so that it never breaks
func DisableCircuitBreaker() WorkOption {
return disableCbOption{}
return WithDisableCircuitBreaker(true)
}

// DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
// WithDisableCircuitBreaker allows the user to control whether circuit breaker is disabled or not
func WithDisableCircuitBreaker(isDisabled bool) WorkOption {
return disableCbOption{disabled: isDisabled}
}

// Deprecated: DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
// Without blb we see increased cpu usage when circuit is open
func DisableBusyLoopBreaker() WorkOption {
return disableBlbOption{}
return WithDisableBusyLoopBreaker(true)
}

// WithDisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
// Without blb we see increased cpu usage when circuit is open
func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption {
return disableBlbOption{disabled: isDisabled}
}

// WithOnDone allows you to specify a callback function executed after processing of a kafka message
Expand Down Expand Up @@ -75,7 +86,9 @@ func (c circuitBreakForOption) apply(w *Work) {
}
}

type disableCbOption struct{}
type disableCbOption struct {
disabled bool
}

func (d disableCbOption) apply(w *Work) {
w.disableCb = true
Expand All @@ -99,7 +112,9 @@ func (o lifeCycleOption) apply(w *Work) {
w.lifecycle = o.lh
}

type disableBlbOption struct{}
type disableBlbOption struct {
disabled bool
}

func (d disableBlbOption) apply(w *Work) {
w.blb.disabled = true
Expand Down

0 comments on commit 358b7b8

Please sign in to comment.