Skip to content

Commit

Permalink
monitor: handle initial state transfer (#1483)
Browse files Browse the repository at this point in the history
* monitor: handle initial state transfer

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>

* Add healing test

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>

---------

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art authored Nov 23, 2023
1 parent cb5a6d2 commit d109337
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 36 deletions.
80 changes: 80 additions & 0 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
nsclient "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
Expand Down Expand Up @@ -814,3 +815,82 @@ func TestNSMGR_RefreshFailed_DataPlaneBroken(t *testing.T) {
_, err = nsc.Close(ctx, conn.Clone())
require.NoError(t, err)
}

// This test shows that healing successfully restores the connection if one of the components is killed during the Request
func TestNSMGR_RefreshFailed_ControlPlaneBroken(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService(t.Name())
nsReg, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := new(count.Server)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)

tokenDuration := time.Minute * 15
clk := clockmock.New(ctx)
clk.Set(time.Now())

// syncCh is used to catch the situation when the forwarder dies during the Request (after the heal chain element)
syncCh := make(chan struct{}, 1)

nsc := domain.Nodes[0].NewClient(ctx,
sandbox.GenerateExpiringToken(tokenDuration),
nsclient.WithHealClient(heal.NewClient(ctx)),
nsclient.WithAdditionalFunctionality(
checkrequest.NewClient(t, func(t *testing.T, request *networkservice.NetworkServiceRequest) {
<-syncCh
}),
),
)

requestCtx, requestCalcel := context.WithTimeout(ctx, time.Second)
requestCtx = clock.WithClock(requestCtx, clk)
defer requestCalcel()

// allow the first Request
syncCh <- struct{}{}
conn, err := nsc.Request(requestCtx, request.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter.Requests())

// refresh interval in this test is expected to be 3 minutes and a few milliseconds
clk.Add(time.Second * 190)

// kill the forwarder during the healing Request (it is stopped by syncCh). Then continue - the healing process will fail.
for _, forwarder := range domain.Nodes[0].Forwarders {
forwarder.Cancel()
break
}
syncCh <- struct{}{}

// create a new forwarder and allow the healing Request
forwarderReg := &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder-2"),
NetworkServiceNames: []string{"forwarder"},
}
domain.Nodes[0].NewForwarder(ctx, forwarderReg, sandbox.GenerateTestToken)
syncCh <- struct{}{}

// wait till Request reached NSE
require.Eventually(t, func() bool {
return counter.Requests() == 2
}, timeout, tick)

_, err = nsc.Close(ctx, conn.Clone())
require.NoError(t, err)
require.Equal(t, 2, counter.Requests())
}
157 changes: 143 additions & 14 deletions pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,8 +21,10 @@ import (
"testing"
"time"

"github.com/edwarnicke/genericsync"
"github.com/google/uuid"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/api/pkg/api/networkservice"
Expand Down Expand Up @@ -62,7 +64,7 @@ func Test_UpstreamRefreshClient(t *testing.T) {
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter,
)

Expand Down Expand Up @@ -129,7 +131,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter1,
)

Expand All @@ -143,7 +145,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
ctx,
nseReg2,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter2,
)

Expand Down Expand Up @@ -201,46 +203,173 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
require.NoError(t, err)
}

type refreshSenderServer struct {
m map[string]*networkservice.Connection
// This test shows a case when the event monitor is faster than the backward request
func Test_UpstreamRefreshClientDelay(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService("my-service"))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

// This NSE will send REFRESH_REQUESTED events
// Channels coordinate the test to send the event at the right time
counter := new(count.Server)
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
_ = domain.Nodes[0].NewEndpoint(
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(ch1, ch2),
counter,
)

// Create the client that has slow request processing
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(
upstreamrefresh.NewClient(ctx),
newSlowHandlerClient(ch1, ch2)),
)

reqCtx, reqClose := context.WithTimeout(ctx, time.Second)
defer reqClose()

req := defaultRequest(nsReg.Name)
req.Connection.Id = uuid.New().String()

conn, err := nsc.Request(reqCtx, req)
require.NoError(t, err)
require.Equal(t, 1, counter.UniqueRequests())

// Eventually we should see a refresh
require.Eventually(t, func() bool { return counter.Requests() == 2 }, timeout, tick)

_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
}

type refreshMTUSenderServer struct {
m *genericsync.Map[string, *networkservice.Connection]
mtu uint32
}

const defaultMtu = 9000

func newRefreshSenderServer() *refreshSenderServer {
return &refreshSenderServer{
m: make(map[string]*networkservice.Connection),
func newRefreshMTUSenderServer() *refreshMTUSenderServer {
return &refreshMTUSenderServer{
m: new(genericsync.Map[string, *networkservice.Connection]),
mtu: defaultMtu,
}
}

func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
func (r *refreshMTUSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}
if conn.GetContext().GetMTU() != r.mtu {
if _, ok := r.m[conn.Id]; ok {
if _, ok := r.m.Load(conn.Id); ok {
return conn, err
}
ec, _ := monitor.LoadEventConsumer(ctx, false)

connectionsToSend := make(map[string]*networkservice.Connection)
for k, v := range r.m {
r.m.Range(func(k string, v *networkservice.Connection) bool {
connectionsToSend[k] = v.Clone()
connectionsToSend[k].State = networkservice.State_REFRESH_REQUESTED
}
return true
})

_ = ec.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: connectionsToSend,
})
}
r.m[conn.Id] = conn
r.m.Store(conn.GetId(), conn.Clone())

return conn, err
}

func (r *refreshMTUSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

type refreshSenderServer struct {
waitSignalCh <-chan struct{}
eventSentCh chan<- struct{}

eventWasSent bool
}

func newRefreshSenderServer(waitSignalCh <-chan struct{}, eventSentCh chan<- struct{}) *refreshSenderServer {
return &refreshSenderServer{
waitSignalCh: waitSignalCh,
eventSentCh: eventSentCh,
}
}

func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

if !r.eventWasSent {
ec, _ := monitor.LoadEventConsumer(ctx, false)

c := conn.Clone()
c.State = networkservice.State_REFRESH_REQUESTED

go func() {
<-r.waitSignalCh
_ = ec.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{c.Id: c},
})
time.Sleep(time.Millisecond * 100)
r.eventWasSent = true
r.eventSentCh <- struct{}{}
}()
} else {
r.eventSentCh <- struct{}{}
}
return conn, err
}

func (r *refreshSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

type slowHandlerClient struct {
startSlowHandlingCh chan<- struct{}
finishSlowHandlingCh <-chan struct{}
}

func newSlowHandlerClient(startSlowHandlingCh chan<- struct{}, finishSlowHandlingCh <-chan struct{}) networkservice.NetworkServiceClient {
return &slowHandlerClient{
startSlowHandlingCh: startSlowHandlingCh,
finishSlowHandlingCh: finishSlowHandlingCh,
}
}

func (s *slowHandlerClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
s.startSlowHandlingCh <- struct{}{}
<-s.finishSlowHandlingCh
return conn, err
}

func (s *slowHandlerClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return next.Client(ctx).Close(ctx, conn, opts...)
}
7 changes: 0 additions & 7 deletions pkg/networkservice/common/heal/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ
return nil, nil, errors.Wrap(err, "failed get MonitorConnections client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, nil, errors.Wrap(err, "failed to get the initial state transfer")
}

logger := log.FromContext(ctx).WithField("heal", "eventLoop")
cev := &eventLoop{
heal: heal,
Expand Down
7 changes: 0 additions & 7 deletions pkg/networkservice/common/monitor/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter
return nil, errors.Wrap(err, "failed to get a MonitorConnections client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get the initial state transfer")
}

cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
Expand Down
3 changes: 2 additions & 1 deletion pkg/networkservice/common/upstreamrefresh/client_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE &&
eventIn.GetType() != networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER {
continue
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/networkservice/common/upstreamrefresh/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ
return nil, errors.Wrap(err, "failed to get a MonitorConnection client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get the initial state transfer")
}

logger := log.FromContext(ctx).WithField("upstreamrefresh", "eventLoop")
cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
Expand Down

0 comments on commit d109337

Please sign in to comment.