From ad1966e9eff7c7402faa59bd83dab402893f217e Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 13 Nov 2023 16:06:55 +0200 Subject: [PATCH 1/5] Added error filtering for CB to pass some gRPC errors. --- engine/access/rpc/connection/manager.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/engine/access/rpc/connection/manager.go b/engine/access/rpc/connection/manager.go index c50a9026748..cea85be679b 100644 --- a/engine/access/rpc/connection/manager.go +++ b/engine/access/rpc/connection/manager.go @@ -368,6 +368,25 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor // MaxRequests defines the max number of concurrent requests while the circuit breaker is in the HalfClosed // state. MaxRequests: m.circuitBreakerConfig.MaxRequests, + // IsSuccessful defines gRPC status codes that should be treated as a successful result for the circuit breaker. + IsSuccessful: func(err error) bool { + if se, ok := status.FromError(err); ok { + if se == nil { + return true + } + + // There are several error cases that may occur during normal operation and should be considered + // as "successful" from the perspective of the circuit breaker. + return se.Code() == codes.OK || + se.Code() == codes.Canceled || + se.Code() == codes.InvalidArgument || + se.Code() == codes.NotFound || + se.Code() == codes.Unimplemented || + se.Code() == codes.OutOfRange + } + + return false + }, }) circuitBreakerInterceptor := func( @@ -379,6 +398,7 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { + var ignoredCBErr error // The circuit breaker integration occurs here, where all invoked calls to the node pass through the // CircuitBreaker.Execute method. This method counts successful and failed invocations, and switches to the // "StateOpen" when the maximum failure threshold is reached. When the circuit breaker is in the "StateOpen" @@ -390,6 +410,11 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor err := invoker(ctx, method, req, reply, cc, opts...) return nil, err }) + + if ignoredCBErr != nil { + return ignoredCBErr + } + return err } From 21419a4f4e2a2decd193321ae6cc6a1efaa2933b Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 13 Nov 2023 16:09:24 +0200 Subject: [PATCH 2/5] Removed unnecessary code --- engine/access/rpc/connection/manager.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/engine/access/rpc/connection/manager.go b/engine/access/rpc/connection/manager.go index cea85be679b..520f7aee997 100644 --- a/engine/access/rpc/connection/manager.go +++ b/engine/access/rpc/connection/manager.go @@ -398,7 +398,6 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { - var ignoredCBErr error // The circuit breaker integration occurs here, where all invoked calls to the node pass through the // CircuitBreaker.Execute method. This method counts successful and failed invocations, and switches to the // "StateOpen" when the maximum failure threshold is reached. When the circuit breaker is in the "StateOpen" @@ -411,10 +410,6 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor return nil, err }) - if ignoredCBErr != nil { - return ignoredCBErr - } - return err } From 398a8713b51ef7167394fa6469b8fcdd254ee6fa Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 13 Nov 2023 16:10:15 +0200 Subject: [PATCH 3/5] Removed unnecessary code --- engine/access/rpc/connection/manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/engine/access/rpc/connection/manager.go b/engine/access/rpc/connection/manager.go index 520f7aee997..3a6b7b1a295 100644 --- a/engine/access/rpc/connection/manager.go +++ b/engine/access/rpc/connection/manager.go @@ -409,7 +409,6 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor err := invoker(ctx, method, req, reply, cc, opts...) return nil, err }) - return err } From c472180256314e259e7e33bda48d5d74b6c2a7ee Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 23 Nov 2023 15:13:35 +0200 Subject: [PATCH 4/5] Added test to emulate errors treated as success for cb --- .../access/rpc/connection/connection_test.go | 142 ++++++++++++------ 1 file changed, 92 insertions(+), 50 deletions(-) diff --git a/engine/access/rpc/connection/connection_test.go b/engine/access/rpc/connection/connection_test.go index a3e1ee3988c..21595fa8a64 100644 --- a/engine/access/rpc/connection/connection_test.go +++ b/engine/access/rpc/connection/connection_test.go @@ -802,7 +802,15 @@ func setupGRPCServer(t *testing.T) *grpc.ClientConn { return conn } -// TestCircuitBreakerExecutionNode tests the circuit breaker state changes for execution nodes. +var successCodes = []codes.Code{ + codes.Canceled, + codes.InvalidArgument, + codes.NotFound, + codes.Unimplemented, + codes.OutOfRange, +} + +// TestCircuitBreakerExecutionNode tests the circuit breaker for execution nodes. func TestCircuitBreakerExecutionNode(t *testing.T) { requestTimeout := 500 * time.Millisecond circuitBreakerRestoreTimeout := 1500 * time.Millisecond @@ -812,11 +820,6 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { en.start(t) defer en.stop(t) - // Set up the handler mock to not respond within the requestTimeout. - req := &execution.PingRequest{} - resp := &execution.PingResponse{} - en.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) - // Create the connection factory. connectionFactory := new(ConnectionFactoryImpl) @@ -852,10 +855,11 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { client, _, err := connectionFactory.GetExecutionAPIClient(en.listener.Addr().String()) require.NoError(t, err) - ctx := context.Background() + req := &execution.PingRequest{} + resp := &execution.PingResponse{} // Helper function to make the Ping call to the execution node and measure the duration. - callAndMeasurePingDuration := func() (time.Duration, error) { + callAndMeasurePingDuration := func(ctx context.Context) (time.Duration, error) { start := time.Now() // Make the call to the execution node. @@ -865,30 +869,51 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { return time.Since(start), err } - // Call and measure the duration for the first invocation. - duration, err := callAndMeasurePingDuration() - assert.Equal(t, codes.DeadlineExceeded, status.Code(err)) - assert.LessOrEqual(t, requestTimeout, duration) + t.Run("test different states of the circuit breaker", func(t *testing.T) { + ctx := context.Background() + + // Set up the handler mock to not respond within the requestTimeout. + en.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) + + // Call and measure the duration for the first invocation. + duration, err := callAndMeasurePingDuration(ctx) + assert.Equal(t, codes.DeadlineExceeded, status.Code(err)) + assert.LessOrEqual(t, requestTimeout, duration) - // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). - duration, err = callAndMeasurePingDuration() - assert.Equal(t, gobreaker.ErrOpenState, err) - assert.Greater(t, requestTimeout, duration) + // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). + duration, err = callAndMeasurePingDuration(ctx) + assert.Equal(t, gobreaker.ErrOpenState, err) + assert.Greater(t, requestTimeout, duration) + + // Reset the mock Ping for the next invocation to return response without delay + en.handler.On("Ping", testifymock.Anything, req).Unset() + en.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + + // Wait until the circuit breaker transitions to the "HalfOpen" state. + time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) + + // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). + duration, err = callAndMeasurePingDuration(ctx) + assert.Greater(t, requestTimeout, duration) + assert.Equal(t, nil, err) + }) - // Reset the mock Ping for the next invocation to return response without delay - en.handler.On("Ping", testifymock.Anything, req).Unset() - en.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + for _, code := range successCodes { + t.Run(fmt.Sprintf("test error %s treated as a success for circuit breaker ", code.String()), func(t *testing.T) { + ctx := context.Background() - // Wait until the circuit breaker transitions to the "HalfOpen" state. - time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) + en.handler.On("Ping", testifymock.Anything, req).Unset() + en.handler.On("Ping", testifymock.Anything, req).Return(nil, status.Error(code, code.String())) - // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). - duration, err = callAndMeasurePingDuration() - assert.Greater(t, requestTimeout, duration) - assert.Equal(t, nil, err) + duration, err := callAndMeasurePingDuration(ctx) + require.Error(t, err) + require.Equal(t, code, status.Code(err)) + require.Greater(t, requestTimeout, duration) + }) + } } -// TestCircuitBreakerCollectionNode tests the circuit breaker state changes for collection nodes. +// TestCircuitBreakerCollectionNode tests the circuit breaker for collection nodes. func TestCircuitBreakerCollectionNode(t *testing.T) { requestTimeout := 500 * time.Millisecond circuitBreakerRestoreTimeout := 1500 * time.Millisecond @@ -898,11 +923,6 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { cn.start(t) defer cn.stop(t) - // Set up the handler mock to not respond within the requestTimeout. - req := &access.PingRequest{} - resp := &access.PingResponse{} - cn.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) - // Create the connection factory. connectionFactory := new(ConnectionFactoryImpl) @@ -938,10 +958,11 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { client, _, err := connectionFactory.GetAccessAPIClient(cn.listener.Addr().String()) assert.NoError(t, err) - ctx := context.Background() + req := &access.PingRequest{} + resp := &access.PingResponse{} // Helper function to make the Ping call to the collection node and measure the duration. - callAndMeasurePingDuration := func() (time.Duration, error) { + callAndMeasurePingDuration := func(ctx context.Context) (time.Duration, error) { start := time.Now() // Make the call to the collection node. @@ -951,25 +972,46 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { return time.Since(start), err } - // Call and measure the duration for the first invocation. - duration, err := callAndMeasurePingDuration() - assert.Equal(t, codes.DeadlineExceeded, status.Code(err)) - assert.LessOrEqual(t, requestTimeout, duration) + t.Run("test different states of the circuit breaker", func(t *testing.T) { + ctx := context.Background() + + // Set up the handler mock to not respond within the requestTimeout. + cn.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) + + // Call and measure the duration for the first invocation. + duration, err := callAndMeasurePingDuration(ctx) + assert.Equal(t, codes.DeadlineExceeded, status.Code(err)) + assert.LessOrEqual(t, requestTimeout, duration) + + // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). + duration, err = callAndMeasurePingDuration(ctx) + assert.Equal(t, gobreaker.ErrOpenState, err) + assert.Greater(t, requestTimeout, duration) - // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). - duration, err = callAndMeasurePingDuration() - assert.Equal(t, gobreaker.ErrOpenState, err) - assert.Greater(t, requestTimeout, duration) + // Reset the mock Ping for the next invocation to return response without delay + cn.handler.On("Ping", testifymock.Anything, req).Unset() + cn.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) - // Reset the mock Ping for the next invocation to return response without delay - cn.handler.On("Ping", testifymock.Anything, req).Unset() - cn.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + // Wait until the circuit breaker transitions to the "HalfOpen" state. + time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) - // Wait until the circuit breaker transitions to the "HalfOpen" state. - time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) + // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). + duration, err = callAndMeasurePingDuration(ctx) + assert.Greater(t, requestTimeout, duration) + assert.Equal(t, nil, err) + }) + + for _, code := range successCodes { + t.Run(fmt.Sprintf("test error %s treated as a success for circuit breaker ", code.String()), func(t *testing.T) { + ctx := context.Background() - // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). - duration, err = callAndMeasurePingDuration() - assert.Greater(t, requestTimeout, duration) - assert.Equal(t, nil, err) + cn.handler.On("Ping", testifymock.Anything, req).Unset() + cn.handler.On("Ping", testifymock.Anything, req).Return(nil, status.Error(code, code.String())) + + duration, err := callAndMeasurePingDuration(ctx) + require.Error(t, err) + require.Equal(t, code, status.Code(err)) + require.Greater(t, requestTimeout, duration) + }) + } } From 7d598d865803aa173eaef0c949e8f09e9fd625aa Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 27 Nov 2023 15:52:18 +0200 Subject: [PATCH 5/5] Replaced check with switch --- engine/access/rpc/connection/manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/engine/access/rpc/connection/manager.go b/engine/access/rpc/connection/manager.go index 3a6b7b1a295..b9cfc792620 100644 --- a/engine/access/rpc/connection/manager.go +++ b/engine/access/rpc/connection/manager.go @@ -377,12 +377,12 @@ func (m *Manager) createCircuitBreakerInterceptor() grpc.UnaryClientInterceptor // There are several error cases that may occur during normal operation and should be considered // as "successful" from the perspective of the circuit breaker. - return se.Code() == codes.OK || - se.Code() == codes.Canceled || - se.Code() == codes.InvalidArgument || - se.Code() == codes.NotFound || - se.Code() == codes.Unimplemented || - se.Code() == codes.OutOfRange + switch se.Code() { + case codes.OK, codes.Canceled, codes.InvalidArgument, codes.NotFound, codes.Unimplemented, codes.OutOfRange: + return true + default: + return false + } } return false