diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index 9e3cc56f4..b826dacff 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -122,7 +122,8 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne append( opts.additionalFunctionality, opts.authorizeClient, - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), + heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), + heal.WithEndpointChange()), networkservice.NewNetworkServiceClient(cc), )..., ) diff --git a/pkg/networkservice/chains/nsmgr/suite_test.go b/pkg/networkservice/chains/nsmgr/suite_test.go index 6efdce777..3bbe4ac09 100644 --- a/pkg/networkservice/chains/nsmgr/suite_test.go +++ b/pkg/networkservice/chains/nsmgr/suite_test.go @@ -402,7 +402,7 @@ func (s *nsmgrSuite) Test_LocalUsecase() { func (s *nsmgrSuite) Test_PassThroughRemoteUsecase() { t := s.T() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() counterClose := new(count.Server) @@ -444,8 +444,12 @@ func (s *nsmgrSuite) Test_PassThroughRemoteUsecase() { } // Refresh - conn, err = nsc.Request(ctx, request) - require.NoError(t, err) + for i := 0; i < 5; i++ { + request.Connection = conn.Clone() + + conn, err = nsc.Request(ctx, request) + require.NoError(t, err) + } // Close _, err = nsc.Close(ctx, conn) @@ -463,7 +467,7 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecase() { t := s.T() const nsesCount = 7 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() counterClose := new(count.Server) @@ -503,8 +507,12 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecase() { } // Refresh - conn, err = nsc.Request(ctx, request) - require.NoError(t, err) + for i := 0; i < 5; i++ { + request.Connection = conn.Clone() + + conn, err = nsc.Request(ctx, request) + require.NoError(t, err) + } // Close _, err = nsc.Close(ctx, conn) diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go index 41b24835c..54b3525ef 100644 --- a/pkg/networkservice/common/heal/client.go +++ b/pkg/networkservice/common/heal/client.go @@ -38,21 +38,28 @@ type connectionInfo struct { } type healClient struct { - ctx context.Context - cc networkservice.MonitorConnectionClient - initOnce sync.Once - initErr error - conns connectionInfoMap + ctx context.Context + cc networkservice.MonitorConnectionClient + initOnce sync.Once + initErr error + conns connectionInfoMap + changeEndpoint bool } // NewClient - creates a new networkservice.NetworkServiceClient chain element that monitors its connections' state // and calls heal server in case connection breaks if heal server is present in the chain // - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. // - cc - MonitorConnectionClient that will be used to watch for connection confirmations and breakages. -func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) networkservice.NetworkServiceClient { +func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient, opts ...Option) networkservice.NetworkServiceClient { + healOpts := new(healOptions) + for _, opt := range opts { + opt(healOpts) + } + return &healClient{ - ctx: ctx, - cc: cc, + ctx: ctx, + cc: cc, + changeEndpoint: healOpts.changeEndpoint, } } @@ -221,7 +228,9 @@ func (u *healClient) listenToConnectionChanges( case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, networkservice.ConnectionEventType_UPDATE: connInfo.active = true connInfo.conn.Path.PathSegments = eventConn.Clone().Path.PathSegments - connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName + if u.changeEndpoint { + connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName + } connInfo.cond.Broadcast() case networkservice.ConnectionEventType_DELETE: if connInfo.active { diff --git a/pkg/networkservice/common/heal/option.go b/pkg/networkservice/common/heal/option.go index c7fcb98af..2c07f94bf 100644 --- a/pkg/networkservice/common/heal/option.go +++ b/pkg/networkservice/common/heal/option.go @@ -67,8 +67,16 @@ func WithRestoreTimeout(restoreTimeout time.Duration) Option { } } +// WithEndpointChange sets if Connection.EndpointName can be changed with monitor updates +func WithEndpointChange() Option { + return func(healOpts *healOptions) { + healOpts.changeEndpoint = true + } +} + type healOptions struct { onHeal *networkservice.NetworkServiceClient onRestore OnRestore restoreTimeout time.Duration + changeEndpoint bool }