Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[deployments-k8s#2393] Fix heal monitor client to work with resetting.NewCredentials #1060

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne
heal.WithOnRestore(heal.OnRestoreRestore),
heal.WithRestoreTimeout(time.Minute)),
clienturl.NewServer(connectTo),
connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
connect.NewServer(ctx, func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return chain.NewNetworkServiceClient(
append(
opts.additionalFunctionality,
opts.authorizeClient,
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
heal.NewClient(ctx, cc),
networkservice.NewNetworkServiceClient(cc),
)...,
)
Expand All @@ -136,7 +136,7 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne

// NewClientFactory - returns a (3.) case func(cc grpc.ClientConnInterface) NSM client factory.
func NewClientFactory(clientOpts ...Option) connect.ClientFactory {
return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
return func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
var rv networkservice.NetworkServiceClient
var opts = &clientOptions{
name: "client-" + uuid.New().String(),
Expand All @@ -153,7 +153,7 @@ func NewClientFactory(clientOpts ...Option) connect.ClientFactory {
refresh.NewClient(ctx),
metadata.NewClient(),
// TODO: move back to the end of the chain when `begin` chain element will be ready
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
heal.NewClient(ctx, cc),
}, opts.additionalFunctionality...),
opts.authorizeClient,
networkservice.NewNetworkServiceClient(cc),
Expand Down
3 changes: 3 additions & 0 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func testNSMGRHealNSMgr(t *testing.T, nodeNum int, restored bool) {

domain.Nodes[nodeNum].NSMgr.Cancel()
} else {
domain.Nodes[nodeNum].NSMgr.Cancel()
// We need to be sure that Client has noticed NSMgr death
time.Sleep(3 * time.Second)
domain.Nodes[nodeNum].NSMgr.Restart()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/nsmgr/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
func TestCreateEndpointDuringRequest(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

defer cancel()
domain := sandbox.NewBuilder(ctx, t).
Expand Down
24 changes: 17 additions & 7 deletions pkg/networkservice/chains/nsmgr/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *nsmgrSuite) SetupSuite() {
// Call cleanup when tests complete
t.Cleanup(func() {
cancel()
goleak.VerifyNone(s.T())
goleak.VerifyNone(s.T(), goleak.IgnoreTopFunction("github.com/networkservicemesh/sdk/pkg/tools/resetting.NewCredentials.func1"))
})

// Create default domain with nodesCount nodes, which will be enough for any test
Expand Down Expand Up @@ -287,6 +287,12 @@ func (s *nsmgrSuite) Test_RemoteUsecase() {
require.Equal(t, 1, counter.Requests())
require.Equal(t, 8, len(conn.Path.PathSegments))

s.domain.Nodes[0].UpdateTokens()
s.domain.Nodes[1].UpdateTokens()

// Make sure that no healing happens
time.Sleep(100 * time.Millisecond)

// Simulate refresh from client
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()
Expand Down Expand Up @@ -325,7 +331,7 @@ func (s *nsmgrSuite) Test_ConnectToDeadNSEUsecase() {

request := defaultRequest(nsReg.Name)

reqCtx, reqCancel := context.WithTimeout(ctx, time.Second)
reqCtx, reqCancel := context.WithTimeout(ctx, 2*time.Second)
defer reqCancel()

conn, err := nsc.Request(reqCtx, request.Clone())
Expand All @@ -340,10 +346,7 @@ func (s *nsmgrSuite) Test_ConnectToDeadNSEUsecase() {
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()

refreshReqCtx, refreshReqCancel := context.WithTimeout(ctx, time.Second)
defer refreshReqCancel()

_, err = nsc.Request(refreshReqCtx, refreshRequest)
_, err = nsc.Request(ctx, refreshRequest)
require.Error(t, err)
require.NoError(t, reqCtx.Err())

Expand Down Expand Up @@ -379,6 +382,11 @@ func (s *nsmgrSuite) Test_LocalUsecase() {
require.Equal(t, 1, counter.Requests())
require.Equal(t, 5, len(conn.Path.PathSegments))

s.domain.Nodes[0].UpdateTokens()

// Make sure that no healing happens
time.Sleep(100 * time.Millisecond)

// Simulate refresh from client
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()
Expand Down Expand Up @@ -673,7 +681,9 @@ func (s *nsmgrSuite) Test_ShouldCleanAllClientAndEndpointGoroutines() {
nsReg, err := s.nsRegistryClient.Register(ctx, defaultRegistryService())
require.NoError(t, err)

defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
defer goleak.VerifyNone(t, goleak.IgnoreCurrent(),
// Starting new NSC, NSE
goleak.IgnoreTopFunction("github.com/networkservicemesh/sdk/pkg/tools/resetting.NewCredentials.func1"))

// At this moment all possible endless NSMgr goroutines have been started. So we expect all newly created goroutines
// to be canceled no later than some of these events:
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/connect/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

// ClientFactory is used to created new clients when new connection is created.
type ClientFactory = func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient
type ClientFactory = func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient

type connectServer struct {
ctx context.Context
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/connect/server_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestConnect_CancelDuringRequest(t *testing.T) {
newPassTroughClient(service1Name),
kernel.NewClient()),
)
clientFactory := func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
clientFactory := func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
counter.Add(1)
return standardClientFactory(ctx, cc)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/networkservice/common/connect/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestConnectServer_Request(t *testing.T) {

s := next.NewNetworkServiceServer(
connect.NewServer(context.Background(),
func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return next.NewNetworkServiceClient(
adapters.NewServerToClient(serverClient),
newMonitorClient(ctx, cc),
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestConnectServer_RequestParallel(t *testing.T) {

s := next.NewNetworkServiceServer(
connect.NewServer(context.Background(),
func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return next.NewNetworkServiceClient(
serverClient,
newMonitorClient(ctx, cc),
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestConnectServer_RequestFail(t *testing.T) {
// 1. Create connectServer

s := connect.NewServer(context.Background(),
func(_ context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
func(_ context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return injecterror.NewClient()
},
connect.WithDialTimeout(time.Second),
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestConnectServer_RequestNextServerError(t *testing.T) {

s := next.NewNetworkServiceServer(
connect.NewServer(context.Background(),
func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return next.NewNetworkServiceClient(
adapters.NewServerToClient(serverClient),
newMonitorClient(ctx, cc),
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestConnectServer_RemoteRestarted(t *testing.T) {
// 1. Create connectServer

s := connect.NewServer(context.Background(),
func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return next.NewNetworkServiceClient(
newMonitorClient(ctx, cc),
networkservice.NewNetworkServiceClient(cc),
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestConnectServer_DialTimeout(t *testing.T) {
// 1. Create connectServer

s := connect.NewServer(context.Background(),
func(_ context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
func(_ context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return networkservice.NewNetworkServiceClient(cc)
},
connect.WithDialTimeout(100*time.Millisecond),
Expand Down
113 changes: 73 additions & 40 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/cancelctx"
Expand All @@ -37,46 +38,58 @@ type connectionInfo struct {
}

type healClient struct {
ctx context.Context
cc networkservice.MonitorConnectionClient
ctx context.Context
cancel context.CancelFunc
cc *grpc.ClientConn

conns connectionInfoMap
createMonitorStream func() (networkservice.MonitorConnection_MonitorConnectionsClient, error)
healConnection, restoreConnection requestHealFuncType

initOnce sync.Once
initErr error
conns connectionInfoMap
}

// NewClient - creates a new networkservice.NetworkServiceClient chain element that monitors its connections' state
// and calls heal server in case connection breaks if heal server is present in the chain
// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client.
// - cc - MonitorConnectionClient that will be used to watch for connection confirmations and breakages.
func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) networkservice.NetworkServiceClient {
return &healClient{
ctx: ctx,
cc: cc,
func NewClient(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
u := &healClient{
ctx: ctx,
cc: cc,
healConnection: func(*networkservice.Connection) {},
restoreConnection: func(*networkservice.Connection) {},
}

if u.cancel = cancelctx.FromContext(ctx); u.cancel == nil {
u.cancel = func() {}
}

return u
}

func (u *healClient) init(ctx context.Context, conn *networkservice.Connection) error {
monitorClient, err := u.cc.MonitorConnections(u.ctx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{{Name: conn.GetCurrentPathSegment().Name}, {Name: ""}},
})
currentName := conn.GetCurrentPathSegment().GetName()
u.createMonitorStream = func() (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
return networkservice.NewMonitorConnectionClient(u.cc).MonitorConnections(u.ctx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{{Name: currentName}, {Name: ""}},
}, grpc.WaitForReady(false))
}

monitorStream, err := u.createMonitorStream()
if err != nil {
return errors.Wrap(err, "MonitorConnections failed")
}

cancel := cancelctx.FromContext(u.ctx)
if cancel == nil {
cancel = func() {}
}
healConnection := requestHealConnectionFunc(ctx)
if healConnection == nil {
healConnection = func(conn *networkservice.Connection) {}
if healConnection := requestHealConnectionFunc(ctx); healConnection != nil {
u.healConnection = healConnection
}
restoreConnection := requestRestoreConnectionFunc(ctx)
if restoreConnection == nil {
restoreConnection = func(conn *networkservice.Connection) {}
if restoreConnection := requestRestoreConnectionFunc(ctx); restoreConnection != nil {
u.restoreConnection = restoreConnection
}

go u.listenToConnectionChanges(cancel, healConnection, restoreConnection, monitorClient)
go u.listenToConnectionChanges(monitorStream)

return nil
}
Expand Down Expand Up @@ -172,25 +185,20 @@ func (u *healClient) Close(ctx context.Context, conn *networkservice.Connection,
// listenToConnectionChanges - loops on events from MonitorConnectionClient while the monitor client is alive.
// Updates connection cache and broadcasts events of successful connections.
// Calls heal when something breaks.
func (u *healClient) listenToConnectionChanges(
cancel context.CancelFunc,
healConnection, restoreConnection requestHealFuncType,
monitorClient networkservice.MonitorConnection_MonitorConnectionsClient,
) {
func (u *healClient) listenToConnectionChanges(monitorStream networkservice.MonitorConnection_MonitorConnectionsClient) {
eventLoop:
for {
event, err := monitorClient.Recv()
if err != nil {
cancel()
u.conns.Range(func(id string, connInfo *connectionInfo) bool {
connInfo.cond.L.Lock()
defer connInfo.cond.L.Unlock()

if connInfo.active {
restoreConnection(connInfo.conn)
}
return true
})
return
event, err := monitorStream.Recv()
for err != nil {
switch ccState, ok := u.waitForCCReady(); {
case !ok:
return
case ccState != connectivity.Ready:
break eventLoop
}
if monitorStream, err = u.createMonitorStream(); err == nil {
continue eventLoop
}
}

for _, eventConn := range event.GetConnections() {
Expand All @@ -213,14 +221,39 @@ func (u *healClient) listenToConnectionChanges(
connInfo.cond.Broadcast()
case networkservice.ConnectionEventType_DELETE:
if connInfo.active {
healConnection(connInfo.conn)
u.healConnection(connInfo.conn)
}
connInfo.active = false
}

connInfo.cond.L.Unlock()
}
}
if u.ctx.Err() != nil {
return
}

u.cancel()

u.conns.Range(func(id string, connInfo *connectionInfo) bool {
connInfo.cond.L.Lock()
defer connInfo.cond.L.Unlock()

if connInfo.active {
u.restoreConnection(connInfo.conn)
}
return true
})
}

func (u *healClient) waitForCCReady() (connectivity.State, bool) {
var ccState connectivity.State
for ccState = u.cc.GetState(); ccState < connectivity.Ready; ccState = u.cc.GetState() {
if !u.cc.WaitForStateChange(u.ctx, ccState) {
return ccState, false
}
}
return ccState, true
}

func (u *healClient) replaceConnectionPath(conn *networkservice.Connection, connInfo *connectionInfo) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/networkservice/common/heal/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func TestHeal_CloseChain(t *testing.T) {
heal.NewServer(ctx,
heal.WithOnHeal(serverChain)),
clienturl.NewServer(remoteURL),
connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
connect.NewServer(ctx, func(ctx context.Context, cc *grpc.ClientConn) networkservice.NetworkServiceClient {
return next.NewNetworkServiceClient(
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
heal.NewClient(ctx, cc),
networkservice.NewNetworkServiceClient(cc),
)
}, connect.WithDialOptions(grpc.WithInsecure())),
Expand All @@ -101,5 +101,5 @@ func TestHeal_CloseChain(t *testing.T) {

require.Eventually(t, func() bool {
return counter.Closes() == 1
}, time.Second, 10*time.Millisecond)
}, 3*time.Second, 10*time.Millisecond)
}
Loading