Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Connection pool evictions cause connection failures #4534

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
54670af
Implemented atomic request counter
Guitarheroua Jun 29, 2023
fa9acbc
Implemented graceful close and test.
Guitarheroua Jun 30, 2023
f906baa
Added comments
Guitarheroua Jun 30, 2023
49f751e
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jun 30, 2023
3886060
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 5, 2023
d80f544
Update engine/access/rpc/backend/connection_factory_test.go
Guitarheroua Jul 7, 2023
721dd84
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 7, 2023
cfdf79a
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 10, 2023
7b7658e
Fixed remarks
Guitarheroua Jul 11, 2023
9089a1e
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 17, 2023
02be088
Fixed remarks
Guitarheroua Jul 17, 2023
e2547b6
Added evicting cache client test
Guitarheroua Jul 18, 2023
fb08d0d
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 18, 2023
7e9e885
Merge branch 'guitarheroua/2833-conn-pool-evictions-cause-conn-failur…
Guitarheroua Jul 18, 2023
8aa1593
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 18, 2023
163cee6
Merge branch 'guitarheroua/2833-conn-pool-evictions-cause-conn-failur…
Guitarheroua Jul 18, 2023
fbd8446
Added test TestExecutionEvictingCacheClients
Guitarheroua Jul 18, 2023
3412d68
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 19, 2023
43bea66
Return add to retrive connection.
Guitarheroua Jul 19, 2023
13c49c8
Merge branch 'guitarheroua/2833-conn-pool-evictions-cause-conn-failur…
Guitarheroua Jul 19, 2023
811f83b
spike to improve locking in grpc connection factory
peterargue Jul 19, 2023
e09ba32
remove cache from factory
peterargue Jul 19, 2023
da22beb
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 20, 2023
6a857fe
Merge branch 'petera/example-conn-factory-refactor' of github.com:onf…
Guitarheroua Jul 20, 2023
ae9c109
Refactor connection factory
Guitarheroua Jul 20, 2023
85b17f9
Added comments to new components
Guitarheroua Jul 20, 2023
48cae42
Merge branch 'guitarheroua/2833-conn-pool-evictions-cause-conn-failur…
Guitarheroua Jul 20, 2023
1122b8c
removed unnecessary changes
Guitarheroua Jul 20, 2023
fb4e43f
removed unnecessary changes
Guitarheroua Jul 20, 2023
226f053
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 20, 2023
5240b3b
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 21, 2023
695ff8c
Fixed review remarks
Guitarheroua Jul 21, 2023
8644def
Merge branch 'master' of github.com:Guitarheroua/flow-go into guitarh…
Guitarheroua Jul 21, 2023
eb6b627
Fixed conflicts
Guitarheroua Jul 21, 2023
46d10fb
Removed work around
Guitarheroua Jul 24, 2023
f7c7116
Merge branch 'master' into guitarheroua/2833-conn-pool-evictions-caus…
Guitarheroua Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.rpcConf.CollectionAddr,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(builder.rpcConf.MaxMsgSize))),
grpc.WithTransportCredentials(insecure.NewCredentials()),
backend.WithClientUnaryInterceptor(builder.rpcConf.CollectionClientTimeout))
backend.WithClientTimeoutOption(builder.rpcConf.CollectionClientTimeout))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions engine/access/apiproxy/access_api_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (h *FlowAccessAPIForwarder) reconnectingClient(i int) error {
identity.Address,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(h.maxMsgSize))),
grpc.WithTransportCredentials(insecure.NewCredentials()),
backend.WithClientUnaryInterceptor(timeout))
backend.WithClientTimeoutOption(timeout))
if err != nil {
return err
}
Expand All @@ -79,7 +79,7 @@ func (h *FlowAccessAPIForwarder) reconnectingClient(i int) error {
identity.Address,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(h.maxMsgSize))),
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
backend.WithClientUnaryInterceptor(timeout))
backend.WithClientTimeoutOption(timeout))
if err != nil {
return fmt.Errorf("cannot connect to %s %w", identity.Address, err)
}
Expand Down
124 changes: 105 additions & 19 deletions engine/access/rpc/backend/connection_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

lru "github.com/hashicorp/golang-lru"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/execution"
Expand Down Expand Up @@ -64,10 +68,30 @@ type ConnectionFactoryImpl struct {
}

type CachedClient struct {
ClientConn *grpc.ClientConn
Address string
mutex sync.Mutex
timeout time.Duration
ClientConn *grpc.ClientConn
Address string
mutex sync.Mutex
timeout time.Duration
requestCounter *atomic.Int32
closeRequested *atomic.Bool
done chan struct{}
}

func (s *CachedClient) UpdateConnection(conn *grpc.ClientConn) {
s.ClientConn = conn
s.closeRequested.Store(false)
s.requestCounter.Store(0)
s.done = make(chan struct{}, 1)
}
peterargue marked this conversation as resolved.
Show resolved Hide resolved

func NewCachedClient(timeout time.Duration, address string) *CachedClient {
return &CachedClient{
Address: address,
timeout: timeout,
requestCounter: atomic.NewInt32(0),
closeRequested: atomic.NewBool(false),
done: make(chan struct{}, 1),
peterargue marked this conversation as resolved.
Show resolved Hide resolved
}
}

// createConnection creates new gRPC connections to remote node
Expand All @@ -84,6 +108,17 @@ func (cf *ConnectionFactoryImpl) createConnection(address string, timeout time.D
Timeout: timeout,
}

var connInterceptors []grpc.UnaryClientInterceptor

if cf.ConnectionsCache != nil {
if res, ok := cf.ConnectionsCache.Get(address); ok {
cachedClient := res.(*CachedClient)
connInterceptors = append(connInterceptors, createRequestWatcherInterceptor(cachedClient))
}
}
peterargue marked this conversation as resolved.
Show resolved Hide resolved

connInterceptors = append(connInterceptors, createClientTimeoutInterceptor(timeout))
peterargue marked this conversation as resolved.
Show resolved Hide resolved

// ClientConn's default KeepAlive on connections is indefinite, assuming the timeout isn't reached
// The connections should be safe to be persisted and reused
// https://pkg.go.dev/google.golang.org/grpc#WithKeepaliveParams
Expand All @@ -93,7 +128,8 @@ func (cf *ConnectionFactoryImpl) createConnection(address string, timeout time.D
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(cf.MaxMsgSize))),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepaliveParams),
WithClientUnaryInterceptor(timeout))
grpc.WithChainUnaryInterceptor(connInterceptors...),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to address %s: %w", address, err)
}
Expand All @@ -110,17 +146,15 @@ func (cf *ConnectionFactoryImpl) retrieveConnection(grpcAddress string, timeout
store = res.(*CachedClient)
conn = store.ClientConn
} else {
store = &CachedClient{
ClientConn: nil,
Address: grpcAddress,
timeout: timeout,
}
store = NewCachedClient(timeout, grpcAddress)

cf.Log.Debug().Str("cached_client_added", grpcAddress).Msg("adding new cached client to pool")
cf.ConnectionsCache.Add(grpcAddress, store)
if cf.AccessMetrics != nil {
cf.AccessMetrics.ConnectionAddedToPool()
}
}

cf.mutex.Unlock()
peterargue marked this conversation as resolved.
Show resolved Hide resolved
store.mutex.Lock()
defer store.mutex.Unlock()
Expand All @@ -131,7 +165,7 @@ func (cf *ConnectionFactoryImpl) retrieveConnection(grpcAddress string, timeout
if err != nil {
return nil, err
}
store.ClientConn = conn
store.UpdateConnection(conn)
if cf.AccessMetrics != nil {
if cacheHit {
cf.AccessMetrics.ConnectionFromPoolUpdated()
Expand Down Expand Up @@ -226,16 +260,26 @@ func (cf *ConnectionFactoryImpl) invalidateAPIClient(address string, port uint)
}
}

// Close closes the CachedClient connection
func (s *CachedClient) Close() {
s.mutex.Lock()
conn := s.ClientConn
s.ClientConn = nil
s.mutex.Unlock()

if conn == nil {
return
}
// allow time for any existing requests to finish before closing the connection
time.Sleep(s.timeout + 1*time.Second)

// Mark the connection for closure
s.closeRequested.Store(true)
peterargue marked this conversation as resolved.
Show resolved Hide resolved

// If there are ongoing requests, wait for them to complete
if s.requestCounter.Load() > 0 {
<-s.done
peterargue marked this conversation as resolved.
Show resolved Hide resolved
}

// Close the connection
conn.Close()
}

Expand All @@ -253,8 +297,46 @@ func getGRPCAddress(address string, grpcPort uint) (string, error) {
return grpcAddress, nil
}

func WithClientUnaryInterceptor(timeout time.Duration) grpc.DialOption {
// createRequestWatcherInterceptor creates a request watcher interceptor to wait for unfinished request before close
func createRequestWatcherInterceptor(cachedClient *CachedClient) grpc.UnaryClientInterceptor {
requestWatcherInterceptor := func(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Prevent new request from being sent if the connection is marked for closure
if cachedClient.closeRequested.Load() {
return status.Errorf(codes.Unavailable, "the connection to %s was closed", cachedClient.Address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the correct behavior, but ideally, we'd never encounter this error since it's too late to reconnect. Is there a check needed in retrieveConnection to ensure the connection isn't closed before reusing?

If we catch it there, then a new connection can be created.

}

// Increment the request counter to track ongoing requests
cachedClient.requestCounter.Add(1)

// Invoke the actual RPC method
err := invoker(ctx, method, req, reply, cc, opts...)

// Decrement the request counter and check if the connection is marked for closure and no more ongoing requests
count := cachedClient.requestCounter.Add(-1)
if cachedClient.closeRequested.Load() && count == 0 {
// Signal that all ongoing requests have completed
select {
case cachedClient.done <- struct{}{}:
default:
}
}

return err
}

return requestWatcherInterceptor
}

// createClientTimeoutInterceptor creates a client interceptor with a context that expires after the timeout.
func createClientTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
clientTimeoutInterceptor := func(
ctx context.Context,
method string,
Expand All @@ -264,17 +346,21 @@ func WithClientUnaryInterceptor(timeout time.Duration) grpc.DialOption {
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {

// create a context that expires after timeout
// Create a context that expires after the specified timeout.
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)

defer cancel()

// call the remote GRPC using the short context
// Call the remote GRPC using the short context.
err := invoker(ctxWithTimeout, method, req, reply, cc, opts...)

return err
}

return grpc.WithUnaryInterceptor(clientTimeoutInterceptor)
return clientTimeoutInterceptor
}

// WithClientTimeoutOption is a helper function to create a GRPC dial option
// with the specified client timeout interceptor.
func WithClientTimeoutOption(timeout time.Duration) grpc.DialOption {
return grpc.WithUnaryInterceptor(createClientTimeoutInterceptor(timeout))
}
99 changes: 98 additions & 1 deletion engine/access/rpc/backend/connection_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"testing"
"time"

"go.uber.org/atomic"
"pgregory.net/rapid"

lru "github.com/hashicorp/golang-lru"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/execution"
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) {
// set the collection grpc port
connectionFactory.CollectionGRPCPort = cn.port
// set the connection pool cache size
cacheSize := 5
cacheSize := 1
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) {
evictedValue.(*CachedClient).Close()
})
Expand Down Expand Up @@ -407,6 +410,100 @@ func TestConnectionPoolStale(t *testing.T) {
assert.Equal(t, resp, expected)
}

// TestExecutionNodeClientClosedGracefully tests the scenario where the execution node client is closed gracefully.
//
// Test Steps:
// - Generate a random number of requests and start goroutines to handle each request.
// - Invalidate the execution API client.
// - Wait for all goroutines to finish.
// - Verify that the number of completed requests matches the number of sent responses.
func TestExecutionNodeClientClosedGracefully(t *testing.T) {
peterargue marked this conversation as resolved.
Show resolved Hide resolved
// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en.start(t)
return en, func() {
en.stop(t)
}
}

// Add rapid test, to check graceful close on different number of requests
rapid.Check(t, func(t *rapid.T) {
en, closer := createExecNode()
defer closer()

// setup the handler mock to not respond within the timeout
req := &execution.PingRequest{}
resp := &execution.PingResponse{}
respSent := atomic.NewUint64(0)
en.handler.On("Ping", testifymock.Anything, req).Run(func(_ testifymock.Arguments) {
respSent.Inc()
peterargue marked this conversation as resolved.
Show resolved Hide resolved
}).Return(resp, nil)

// create the factory
connectionFactory := new(ConnectionFactoryImpl)
// set the execution grpc port
connectionFactory.ExecutionGRPCPort = en.port
// set the execution grpc client timeout
connectionFactory.ExecutionNodeGRPCTimeout = time.Second
// set the connection pool cache size
cacheSize := 1
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) {
evictedValue.(*CachedClient).Close()
})
connectionFactory.ConnectionsCache = cache
connectionFactory.CacheSize = uint(cacheSize)
// set metrics reporting
connectionFactory.AccessMetrics = metrics.NewNoopCollector()

clientAddress := en.listener.Addr().String()
// create the execution API client
client, _, err := connectionFactory.GetExecutionAPIClient(clientAddress)
assert.NoError(t, err)

result, _ := cache.Get(clientAddress)
clientConn := result.(*CachedClient).ClientConn
clientConn.GetState()

ctx := context.Background()

// Generate random number of requests
nofRequests := rapid.IntRange(10, 100).Draw(t, "nofRequests").(int)
reqCompleted := atomic.NewUint64(0)

var waitGroup sync.WaitGroup

for i := 0; i < nofRequests; i++ {
waitGroup.Add(1)

// call Ping request from different goroutines
go func() {
defer waitGroup.Done()
_, err := client.Ping(ctx, req)

if err == nil {
reqCompleted.Inc()
} else {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unavailable {
return
}
}
fmt.Println("!!! Failed: ", err)
t.Fail()
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}

// Close connection
connectionFactory.InvalidateExecutionAPIClient(clientAddress)

waitGroup.Wait()

assert.Equal(t, reqCompleted.Load(), respSent.Load())
})
}

// node mocks a flow node that runs a GRPC server
type node struct {
server *grpc.Server
Expand Down