Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[qfix] Add WithEndpointChange heal option #1082

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne
append(
opts.additionalFunctionality,
opts.authorizeClient,
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc),
heal.WithEndpointChange()),
networkservice.NewNetworkServiceClient(cc),
)...,
)
Expand Down
20 changes: 14 additions & 6 deletions pkg/networkservice/chains/nsmgr/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s *nsmgrSuite) Test_LocalUsecase() {
func (s *nsmgrSuite) Test_PassThroughRemoteUsecase() {
t := s.T()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

counterClose := new(count.Server)
Expand Down Expand Up @@ -444,8 +444,12 @@ func (s *nsmgrSuite) Test_PassThroughRemoteUsecase() {
}

// Refresh
conn, err = nsc.Request(ctx, request)
require.NoError(t, err)
for i := 0; i < 5; i++ {
request.Connection = conn.Clone()

conn, err = nsc.Request(ctx, request)
require.NoError(t, err)
}

// Close
_, err = nsc.Close(ctx, conn)
Expand All @@ -463,7 +467,7 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecase() {
t := s.T()
const nsesCount = 7

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

counterClose := new(count.Server)
Expand Down Expand Up @@ -503,8 +507,12 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecase() {
}

// Refresh
conn, err = nsc.Request(ctx, request)
require.NoError(t, err)
for i := 0; i < 5; i++ {
request.Connection = conn.Clone()

conn, err = nsc.Request(ctx, request)
require.NoError(t, err)
}

// Close
_, err = nsc.Close(ctx, conn)
Expand Down
27 changes: 18 additions & 9 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,28 @@ type connectionInfo struct {
}

type healClient struct {
ctx context.Context
cc networkservice.MonitorConnectionClient
initOnce sync.Once
initErr error
conns connectionInfoMap
ctx context.Context
cc networkservice.MonitorConnectionClient
initOnce sync.Once
initErr error
conns connectionInfoMap
changeEndpoint bool
}

// NewClient - creates a new networkservice.NetworkServiceClient chain element that monitors its connections' state
// and calls heal server in case connection breaks if heal server is present in the chain
// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client.
// - cc - MonitorConnectionClient that will be used to watch for connection confirmations and breakages.
func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) networkservice.NetworkServiceClient {
func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient, opts ...Option) networkservice.NetworkServiceClient {
healOpts := new(healOptions)
for _, opt := range opts {
opt(healOpts)
}

return &healClient{
ctx: ctx,
cc: cc,
ctx: ctx,
cc: cc,
changeEndpoint: healOpts.changeEndpoint,
}
}

Expand Down Expand Up @@ -221,7 +228,9 @@ func (u *healClient) listenToConnectionChanges(
case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, networkservice.ConnectionEventType_UPDATE:
connInfo.active = true
connInfo.conn.Path.PathSegments = eventConn.Clone().Path.PathSegments
connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName
if u.changeEndpoint {
connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName
}
connInfo.cond.Broadcast()
case networkservice.ConnectionEventType_DELETE:
if connInfo.active {
Expand Down
8 changes: 8 additions & 0 deletions pkg/networkservice/common/heal/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ func WithRestoreTimeout(restoreTimeout time.Duration) Option {
}
}

// WithEndpointChange sets if Connection.EndpointName can be changed with monitor updates
func WithEndpointChange() Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bolodya1997 Could you please explain in what cases we plan do not use this option?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is NSC-only option. Do you mean adding such comment in code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow why do we need the option only for nscs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently have the following on the Request:

  1. NSC -> NSMgr : endpointName == ""
  2. NSMgr -> Forwarder -> NSMgr -> Passthrough : endpointName == "Passthrough"
  3. Passthrough -> NSMgr : endpointName == ""
  4. NSMgr -> Forwarder -> NSMgr -> NSE : endpointName == "NSE"
  5. Passthrough <- NSMgr <- Forwarder <- NSMgr <- NSE : endpointName == "NSE"
  6. NSC <- NSMgr <- Forwarder <- NSMgr <- Passthrough : endpointName == "Passthrough"

On refresh Request we should have the following:

  1. NSC -> NSMgr -> Forwarder -> NSMgr -> Passthrough : endpointName == "Passthrough"
  2. Passthrough -> NSMgr : endpointName == ""
  3. NSMgr -> Forwarder -> NSMgr -> NSE : endpointName == "NSE"
  4. Passthrough <- NSMgr <- Forwarder <- NSMgr <- NSE : endpointName == "NSE"
  5. NSC <- NSMgr <- Forwarder <- NSMgr <- Passthrough : endpointName == "Passthrough"

But because of heal client standing in the chain before the replacelabels client after refresh happen in NSMgr (Passthrough -> NSMgr) we will have the following:

  1. NSC -> NSMgr -> Forwarder -> NSMgr -> Passthrough : endpointName == "Passthrough"
  2. Passthrough -> NSMgr -> Forwarder -> NSMgr -> NSE : endpointName == "NSE"
  3. NSC <- NSMgr <- Forwarder <- NSMgr <- Passthrough <- NSMgr <- Forwarder <- NSMgr <- NSE : endpointName == "NSE"

And so subsequent refresh Request will fail because of invalid Endpoint name.

We cannot move heal client after additionalFunctionality clients, because it will lead to problems with mechanismtranslation client. So we can just disable endpointName update for all applications except NSC.

return func(healOpts *healOptions) {
healOpts.changeEndpoint = true
}
}

type healOptions struct {
onHeal *networkservice.NetworkServiceClient
onRestore OnRestore
restoreTimeout time.Duration
changeEndpoint bool
}