Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitor: handle initial state transfer #1483

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
nsclient "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
Expand Down Expand Up @@ -814,3 +815,82 @@ func TestNSMGR_RefreshFailed_DataPlaneBroken(t *testing.T) {
_, err = nsc.Close(ctx, conn.Clone())
require.NoError(t, err)
}

// This test shows that healing successfully restores the connection if one of the components is killed during the Request
func TestNSMGR_RefreshFailed_ControlPlaneBroken(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService(t.Name())
nsReg, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := new(count.Server)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)

tokenDuration := time.Minute * 15
clk := clockmock.New(ctx)
clk.Set(time.Now())

// syncCh is used to catch the situation when the forwarder dies during the Request (after the heal chain element)
syncCh := make(chan struct{}, 1)

nsc := domain.Nodes[0].NewClient(ctx,
sandbox.GenerateExpiringToken(tokenDuration),
nsclient.WithHealClient(heal.NewClient(ctx)),
nsclient.WithAdditionalFunctionality(
checkrequest.NewClient(t, func(t *testing.T, request *networkservice.NetworkServiceRequest) {
<-syncCh
}),
),
)

requestCtx, requestCalcel := context.WithTimeout(ctx, time.Second)
requestCtx = clock.WithClock(requestCtx, clk)
defer requestCalcel()

// allow the first Request
syncCh <- struct{}{}
conn, err := nsc.Request(requestCtx, request.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter.Requests())

// refresh interval in this test is expected to be 3 minutes and a few milliseconds
clk.Add(time.Second * 190)

// kill the forwarder during the healing Request (it is stopped by syncCh). Then continue - the healing process will fail.
for _, forwarder := range domain.Nodes[0].Forwarders {
forwarder.Cancel()
break
}
syncCh <- struct{}{}

// create a new forwarder and allow the healing Request
forwarderReg := &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder-2"),
NetworkServiceNames: []string{"forwarder"},
}
domain.Nodes[0].NewForwarder(ctx, forwarderReg, sandbox.GenerateTestToken)
syncCh <- struct{}{}

// wait till Request reached NSE
require.Eventually(t, func() bool {
return counter.Requests() == 2
}, timeout, tick)

_, err = nsc.Close(ctx, conn.Clone())
require.NoError(t, err)
require.Equal(t, 2, counter.Requests())
}
157 changes: 143 additions & 14 deletions pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,8 +21,10 @@ import (
"testing"
"time"

"github.com/edwarnicke/genericsync"
"github.com/google/uuid"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/api/pkg/api/networkservice"
Expand Down Expand Up @@ -62,7 +64,7 @@ func Test_UpstreamRefreshClient(t *testing.T) {
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter,
)

Expand Down Expand Up @@ -129,7 +131,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter1,
)

Expand All @@ -143,7 +145,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
ctx,
nseReg2,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter2,
)

Expand Down Expand Up @@ -201,46 +203,173 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
require.NoError(t, err)
}

type refreshSenderServer struct {
m map[string]*networkservice.Connection
// This test shows a case when the event monitor is faster than the backward request
func Test_UpstreamRefreshClientDelay(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService("my-service"))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

// This NSE will send REFRESH_REQUESTED events
// Channels coordinate the test to send the event at the right time
counter := new(count.Server)
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
_ = domain.Nodes[0].NewEndpoint(
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(ch1, ch2),
counter,
)

// Create the client that has slow request processing
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(
upstreamrefresh.NewClient(ctx),
newSlowHandlerClient(ch1, ch2)),
)

reqCtx, reqClose := context.WithTimeout(ctx, time.Second)
defer reqClose()

req := defaultRequest(nsReg.Name)
req.Connection.Id = uuid.New().String()

conn, err := nsc.Request(reqCtx, req)
require.NoError(t, err)
require.Equal(t, 1, counter.UniqueRequests())

// Eventually we should see a refresh
require.Eventually(t, func() bool { return counter.Requests() == 2 }, timeout, tick)

_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
}

type refreshMTUSenderServer struct {
m *genericsync.Map[string, *networkservice.Connection]
mtu uint32
}

const defaultMtu = 9000

func newRefreshSenderServer() *refreshSenderServer {
return &refreshSenderServer{
m: make(map[string]*networkservice.Connection),
func newRefreshMTUSenderServer() *refreshMTUSenderServer {
return &refreshMTUSenderServer{
m: new(genericsync.Map[string, *networkservice.Connection]),
mtu: defaultMtu,
}
}

func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
func (r *refreshMTUSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}
if conn.GetContext().GetMTU() != r.mtu {
if _, ok := r.m[conn.Id]; ok {
if _, ok := r.m.Load(conn.Id); ok {
return conn, err
}
ec, _ := monitor.LoadEventConsumer(ctx, false)

connectionsToSend := make(map[string]*networkservice.Connection)
for k, v := range r.m {
r.m.Range(func(k string, v *networkservice.Connection) bool {
connectionsToSend[k] = v.Clone()
connectionsToSend[k].State = networkservice.State_REFRESH_REQUESTED
}
return true
})

_ = ec.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: connectionsToSend,
})
}
r.m[conn.Id] = conn
r.m.Store(conn.GetId(), conn.Clone())

return conn, err
}

func (r *refreshMTUSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

type refreshSenderServer struct {
waitSignalCh <-chan struct{}
eventSentCh chan<- struct{}

eventWasSent bool
}

func newRefreshSenderServer(waitSignalCh <-chan struct{}, eventSentCh chan<- struct{}) *refreshSenderServer {
return &refreshSenderServer{
waitSignalCh: waitSignalCh,
eventSentCh: eventSentCh,
}
}

func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

if !r.eventWasSent {
ec, _ := monitor.LoadEventConsumer(ctx, false)

c := conn.Clone()
c.State = networkservice.State_REFRESH_REQUESTED

go func() {
<-r.waitSignalCh
_ = ec.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{c.Id: c},
})
time.Sleep(time.Millisecond * 100)
r.eventWasSent = true
r.eventSentCh <- struct{}{}
}()
} else {
r.eventSentCh <- struct{}{}
}
return conn, err
}

func (r *refreshSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

type slowHandlerClient struct {
startSlowHandlingCh chan<- struct{}
finishSlowHandlingCh <-chan struct{}
}

func newSlowHandlerClient(startSlowHandlingCh chan<- struct{}, finishSlowHandlingCh <-chan struct{}) networkservice.NetworkServiceClient {
return &slowHandlerClient{
startSlowHandlingCh: startSlowHandlingCh,
finishSlowHandlingCh: finishSlowHandlingCh,
}
}

func (s *slowHandlerClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
s.startSlowHandlingCh <- struct{}{}
<-s.finishSlowHandlingCh
return conn, err
}

func (s *slowHandlerClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return next.Client(ctx).Close(ctx, conn, opts...)
}
7 changes: 0 additions & 7 deletions pkg/networkservice/common/heal/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ
return nil, nil, errors.Wrap(err, "failed get MonitorConnections client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, nil, errors.Wrap(err, "failed to get the initial state transfer")
}

logger := log.FromContext(ctx).WithField("heal", "eventLoop")
cev := &eventLoop{
heal: heal,
Expand Down
7 changes: 0 additions & 7 deletions pkg/networkservice/common/monitor/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter
return nil, errors.Wrap(err, "failed to get a MonitorConnections client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get the initial state transfer")
}

cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
Expand Down
3 changes: 2 additions & 1 deletion pkg/networkservice/common/upstreamrefresh/client_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE &&
eventIn.GetType() != networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER {
continue
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/networkservice/common/upstreamrefresh/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ
return nil, errors.Wrap(err, "failed to get a MonitorConnection client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get the initial state transfer")
}

logger := log.FromContext(ctx).WithField("upstreamrefresh", "eventLoop")
cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
Expand Down
Loading