Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Circuit breaker too restrictive #5010

Merged
142 changes: 92 additions & 50 deletions engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,15 @@ func setupGRPCServer(t *testing.T) *grpc.ClientConn {
return conn
}

// TestCircuitBreakerExecutionNode tests the circuit breaker state changes for execution nodes.
var successCodes = []codes.Code{
codes.Canceled,
codes.InvalidArgument,
codes.NotFound,
codes.Unimplemented,
codes.OutOfRange,
}

// TestCircuitBreakerExecutionNode tests the circuit breaker for execution nodes.
func TestCircuitBreakerExecutionNode(t *testing.T) {
requestTimeout := 500 * time.Millisecond
circuitBreakerRestoreTimeout := 1500 * time.Millisecond
Expand All @@ -812,11 +820,6 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
en.start(t)
defer en.stop(t)

// Set up the handler mock to not respond within the requestTimeout.
req := &execution.PingRequest{}
resp := &execution.PingResponse{}
en.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil)

// Create the connection factory.
connectionFactory := new(ConnectionFactoryImpl)

Expand Down Expand Up @@ -852,10 +855,11 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
client, _, err := connectionFactory.GetExecutionAPIClient(en.listener.Addr().String())
require.NoError(t, err)

ctx := context.Background()
req := &execution.PingRequest{}
resp := &execution.PingResponse{}

// Helper function to make the Ping call to the execution node and measure the duration.
callAndMeasurePingDuration := func() (time.Duration, error) {
callAndMeasurePingDuration := func(ctx context.Context) (time.Duration, error) {
start := time.Now()

// Make the call to the execution node.
Expand All @@ -865,30 +869,51 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
return time.Since(start), err
}

// Call and measure the duration for the first invocation.
duration, err := callAndMeasurePingDuration()
assert.Equal(t, codes.DeadlineExceeded, status.Code(err))
assert.LessOrEqual(t, requestTimeout, duration)
t.Run("test different states of the circuit breaker", func(t *testing.T) {
ctx := context.Background()

// Set up the handler mock to not respond within the requestTimeout.
en.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil)

// Call and measure the duration for the first invocation.
duration, err := callAndMeasurePingDuration(ctx)
assert.Equal(t, codes.DeadlineExceeded, status.Code(err))
assert.LessOrEqual(t, requestTimeout, duration)

// Call and measure the duration for the second invocation (circuit breaker state is now "Open").
duration, err = callAndMeasurePingDuration()
assert.Equal(t, gobreaker.ErrOpenState, err)
assert.Greater(t, requestTimeout, duration)
// Call and measure the duration for the second invocation (circuit breaker state is now "Open").
duration, err = callAndMeasurePingDuration(ctx)
assert.Equal(t, gobreaker.ErrOpenState, err)
assert.Greater(t, requestTimeout, duration)

// Reset the mock Ping for the next invocation to return response without delay
en.handler.On("Ping", testifymock.Anything, req).Unset()
en.handler.On("Ping", testifymock.Anything, req).Return(resp, nil)

// Wait until the circuit breaker transitions to the "HalfOpen" state.
time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond))

// Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen").
duration, err = callAndMeasurePingDuration(ctx)
assert.Greater(t, requestTimeout, duration)
assert.Equal(t, nil, err)
})

// Reset the mock Ping for the next invocation to return response without delay
en.handler.On("Ping", testifymock.Anything, req).Unset()
en.handler.On("Ping", testifymock.Anything, req).Return(resp, nil)
for _, code := range successCodes {
t.Run(fmt.Sprintf("test error %s treated as a success for circuit breaker ", code.String()), func(t *testing.T) {
ctx := context.Background()

// Wait until the circuit breaker transitions to the "HalfOpen" state.
time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond))
en.handler.On("Ping", testifymock.Anything, req).Unset()
en.handler.On("Ping", testifymock.Anything, req).Return(nil, status.Error(code, code.String()))

// Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen").
duration, err = callAndMeasurePingDuration()
assert.Greater(t, requestTimeout, duration)
assert.Equal(t, nil, err)
duration, err := callAndMeasurePingDuration(ctx)
require.Error(t, err)
require.Equal(t, code, status.Code(err))
require.Greater(t, requestTimeout, duration)
})
}
}

// TestCircuitBreakerCollectionNode tests the circuit breaker state changes for collection nodes.
// TestCircuitBreakerCollectionNode tests the circuit breaker for collection nodes.
func TestCircuitBreakerCollectionNode(t *testing.T) {
requestTimeout := 500 * time.Millisecond
circuitBreakerRestoreTimeout := 1500 * time.Millisecond
Expand All @@ -898,11 +923,6 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
cn.start(t)
defer cn.stop(t)

// Set up the handler mock to not respond within the requestTimeout.
req := &access.PingRequest{}
resp := &access.PingResponse{}
cn.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil)

// Create the connection factory.
connectionFactory := new(ConnectionFactoryImpl)

Expand Down Expand Up @@ -938,10 +958,11 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
client, _, err := connectionFactory.GetAccessAPIClient(cn.listener.Addr().String())
assert.NoError(t, err)

ctx := context.Background()
req := &access.PingRequest{}
resp := &access.PingResponse{}

// Helper function to make the Ping call to the collection node and measure the duration.
callAndMeasurePingDuration := func() (time.Duration, error) {
callAndMeasurePingDuration := func(ctx context.Context) (time.Duration, error) {
start := time.Now()

// Make the call to the collection node.
Expand All @@ -951,25 +972,46 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
return time.Since(start), err
}

// Call and measure the duration for the first invocation.
duration, err := callAndMeasurePingDuration()
assert.Equal(t, codes.DeadlineExceeded, status.Code(err))
assert.LessOrEqual(t, requestTimeout, duration)
t.Run("test different states of the circuit breaker", func(t *testing.T) {
ctx := context.Background()

// Set up the handler mock to not respond within the requestTimeout.
cn.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil)

// Call and measure the duration for the first invocation.
duration, err := callAndMeasurePingDuration(ctx)
assert.Equal(t, codes.DeadlineExceeded, status.Code(err))
assert.LessOrEqual(t, requestTimeout, duration)

// Call and measure the duration for the second invocation (circuit breaker state is now "Open").
duration, err = callAndMeasurePingDuration(ctx)
assert.Equal(t, gobreaker.ErrOpenState, err)
assert.Greater(t, requestTimeout, duration)

// Call and measure the duration for the second invocation (circuit breaker state is now "Open").
duration, err = callAndMeasurePingDuration()
assert.Equal(t, gobreaker.ErrOpenState, err)
assert.Greater(t, requestTimeout, duration)
// Reset the mock Ping for the next invocation to return response without delay
cn.handler.On("Ping", testifymock.Anything, req).Unset()
cn.handler.On("Ping", testifymock.Anything, req).Return(resp, nil)

// Reset the mock Ping for the next invocation to return response without delay
cn.handler.On("Ping", testifymock.Anything, req).Unset()
cn.handler.On("Ping", testifymock.Anything, req).Return(resp, nil)
// Wait until the circuit breaker transitions to the "HalfOpen" state.
time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond))

// Wait until the circuit breaker transitions to the "HalfOpen" state.
time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond))
// Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen").
duration, err = callAndMeasurePingDuration(ctx)
assert.Greater(t, requestTimeout, duration)
assert.Equal(t, nil, err)
})

for _, code := range successCodes {
t.Run(fmt.Sprintf("test error %s treated as a success for circuit breaker ", code.String()), func(t *testing.T) {
ctx := context.Background()

// Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen").
duration, err = callAndMeasurePingDuration()
assert.Greater(t, requestTimeout, duration)
assert.Equal(t, nil, err)
cn.handler.On("Ping", testifymock.Anything, req).Unset()
cn.handler.On("Ping", testifymock.Anything, req).Return(nil, status.Error(code, code.String()))

duration, err := callAndMeasurePingDuration(ctx)
require.Error(t, err)
require.Equal(t, code, status.Code(err))
require.Greater(t, requestTimeout, duration)
})
}
}
19 changes: 19 additions & 0 deletions engine/access/rpc/connection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,25 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor
// MaxRequests defines the max number of concurrent requests while the circuit breaker is in the HalfClosed
// state.
MaxRequests: m.circuitBreakerConfig.MaxRequests,
// IsSuccessful defines gRPC status codes that should be treated as a successful result for the circuit breaker.
IsSuccessful: func(err error) bool {
if se, ok := status.FromError(err); ok {
if se == nil {
return true
}

// There are several error cases that may occur during normal operation and should be considered
// as "successful" from the perspective of the circuit breaker.
return se.Code() == codes.OK ||
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
se.Code() == codes.Canceled ||
se.Code() == codes.InvalidArgument ||
se.Code() == codes.NotFound ||
se.Code() == codes.Unimplemented ||
se.Code() == codes.OutOfRange
}

return false
},
})

circuitBreakerInterceptor := func(
Expand Down
Loading