Skip to content

Commit

Permalink
discoverServer: handle Close (networkservicemesh#571)
Browse files Browse the repository at this point in the history
Signed-off-by: Albert Safin <albert.safin@xored.com>
Signed-off-by: Sergey Ershov <sergey.ershov@xored.com>
  • Loading branch information
xzfc authored and Sergey Ershov committed Dec 23, 2020
1 parent 5dce8a0 commit 9b4f8c2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 233 deletions.
199 changes: 19 additions & 180 deletions pkg/networkservice/chains/nsmgr/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io/ioutil"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -231,7 +232,6 @@ func TestNSMGR_RemoteUsecase(t *testing.T) {
require.Equal(t, 8, len(conn.Path.PathSegments))

// Close.

e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
require.NotNil(t, e)
Expand All @@ -254,6 +254,9 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
Name: "final-endpoint",
NetworkServiceNames: []string{"my-service-remote"},
}
counter := &counterServer{}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter)
require.NoError(t, err)

counter := &counterServer{}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter)
Expand All @@ -271,7 +274,6 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
Context: &networkservice.ConnectionContext{},
},
}

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)
Expand All @@ -287,191 +289,14 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, conn2)
require.Equal(t, 5, len(conn2.Path.PathSegments))
require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))
// Close.

// Close.
e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
require.NotNil(t, e)
require.Equal(t, int32(1), atomic.LoadInt32(&counter.Closes))
}

func TestNSMGR_PassThroughRemote(t *testing.T) {
nodesCount := 7

defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
logrus.SetOutput(ioutil.Discard)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
domain := sandbox.NewBuilder(t).
SetNodesCount(nodesCount).
SetContext(ctx).
SetRegistryProxySupplier(nil).
Build()
defer domain.Cleanup()

for i := 0; i < nodesCount; i++ {
var additionalFunctionality []networkservice.NetworkServiceServer
if i != 0 {
k := i
// Passtrough to the node i-1
additionalFunctionality = []networkservice.NetworkServiceServer{
chain.NewNetworkServiceServer(
clienturl.NewServer(domain.Nodes[i].NSMgr.URL),
connect.NewServer(ctx,
sandbox.NewCrossConnectClientFactory(sandbox.GenerateTestToken,
newPassTroughClient(
fmt.Sprintf("my-service-remote-%v", k-1),
fmt.Sprintf("endpoint-%v", k-1)),
kernel.NewClient()),
append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())...,
),
),
}
}
nseReg := &registry.NetworkServiceEndpoint{
Name: fmt.Sprintf("endpoint-%v", i),
NetworkServiceNames: []string{fmt.Sprintf("my-service-remote-%v", i)},
}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[i].NSMgr, additionalFunctionality...)
require.NoError(t, err)
}

nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[nodesCount-1].NSMgr.URL)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
NetworkService: fmt.Sprintf("my-service-remote-%v", nodesCount-1),
Context: &networkservice.ConnectionContext{},
},
}

conn, err := nsc.Request(ctx, request)
require.NoError(t, err)
require.NotNil(t, conn)

// Path length to first endpoint is 5
// Path length from NSE client to other remote endpoint is 8
require.Equal(t, 8*(nodesCount-1)+5, len(conn.Path.PathSegments))
}

func TestNSMGR_PassThroughLocal(t *testing.T) {
nsesCount := 7

defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
logrus.SetOutput(ioutil.Discard)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
domain := sandbox.NewBuilder(t).
SetNodesCount(1).
SetContext(ctx).
SetRegistryProxySupplier(nil).
Build()
defer domain.Cleanup()

for i := 0; i < nsesCount; i++ {
var additionalFunctionality []networkservice.NetworkServiceServer
if i != 0 {
k := i
additionalFunctionality = []networkservice.NetworkServiceServer{
chain.NewNetworkServiceServer(
clienturl.NewServer(domain.Nodes[0].NSMgr.URL),
connect.NewServer(ctx,
sandbox.NewCrossConnectClientFactory(sandbox.GenerateTestToken,
newPassTroughClient(
fmt.Sprintf("my-service-remote-%v", k-1),
fmt.Sprintf("endpoint-%v", k-1)),
kernel.NewClient()),
append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())...,
),
),
}
}
nseReg := &registry.NetworkServiceEndpoint{
Name: fmt.Sprintf("endpoint-%v", i),
NetworkServiceNames: []string{fmt.Sprintf("my-service-remote-%v", i)},
}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, additionalFunctionality...)
require.NoError(t, err)
}

nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
NetworkService: fmt.Sprintf("my-service-remote-%v", nsesCount-1),
Context: &networkservice.ConnectionContext{},
},
}

conn, err := nsc.Request(ctx, request)
require.NoError(t, err)
require.NotNil(t, conn)

// Path length to first endpoint is 5
// Path length from NSE client to other local endpoint is 5
require.Equal(t, 5*(nsesCount-1)+5, len(conn.Path.PathSegments))
}

type passThroughClient struct {
networkService string
networkServiceEndpointName string
}

func newPassTroughClient(networkService, networkServiceEndpointName string) *passThroughClient {
return &passThroughClient{
networkService: networkService,
networkServiceEndpointName: networkServiceEndpointName,
}
}

func (c *passThroughClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
request.Connection.NetworkService = c.networkService
request.Connection.NetworkServiceEndpointName = c.networkServiceEndpointName
return next.Client(ctx).Request(ctx, request, opts...)
}

func (c *passThroughClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
conn.NetworkService = c.networkService
conn.NetworkServiceEndpointName = c.networkServiceEndpointName
return next.Client(ctx).Close(ctx, conn, opts...)
}

type counterServer struct {
Requests, Closes int32
}

func (c *counterServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
atomic.AddInt32(&c.Requests, 1)
return next.Server(ctx).Request(ctx, request)
}

func (c *counterServer) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
atomic.AddInt32(&c.Closes, 1)
return next.Server(ctx).Close(ctx, connection)
}

type busyEndpoint struct{}

func (c *busyEndpoint) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
return nil, errors.New("sorry, endpoint is busy, try again later")
}

func (c *busyEndpoint) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
return nil, errors.New("sorry, endpoint is busy, try again later")
}
func newBusyEndpoint() networkservice.NetworkServiceServer {
return new(busyEndpoint)
}

func TestNSMGR_PassThroughRemote(t *testing.T) {
nodesCount := 7

Expand Down Expand Up @@ -641,3 +466,17 @@ func (p *passThroughClient) Close(ctx context.Context, conn *networkservice.Conn
conn = conn.Clone()
return next.Client(ctx).Close(ctx, conn, opts...)
}

type counterServer struct {
Requests, Closes int32
}

func (c *counterServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
atomic.AddInt32(&c.Requests, 1)
return next.Server(ctx).Request(ctx, request)
}

func (c *counterServer) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
atomic.AddInt32(&c.Closes, 1)
return next.Server(ctx).Close(ctx, connection)
}
79 changes: 26 additions & 53 deletions pkg/networkservice/common/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,7 @@ func NewServer(nsClient registry.NetworkServiceRegistryClient, nseClient registr
func (d *discoverCandidatesServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
nseName := request.GetConnection().GetNetworkServiceEndpointName()
if nseName != "" {
nse, err := d.discoverNetworkServiceEndpoint(ctx, nseName)
if err != nil {
return nil, err
}
u, err := url.Parse(nse.Url)
if err != nil {
return nil, errors.WithStack(err)
}
return next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request)
}
ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload())
if err != nil {
return nil, err
}
nses, err := d.discoverNetworkServiceEndpoints(ctx, ns, request.GetConnection().GetLabels())
if err != nil {
return nil, err
}
visit := map[string]struct{}{}
for ctx.Err() == nil {
resp, err := next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request)
if err == nil {
return resp, err
}
for _, nse := range nses {
visit[nse.Name] = struct{}{}
}
nses, err = d.discoverNetworkServiceEndpoints(ctx, ns, request.GetConnection().GetLabels())
u, err := d.urlByNseName(nseName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,35 +154,35 @@ func (d *discoverCandidatesServer) discoverNetworkServiceEndpoints(ctx context.C
}
}

func (d *discoverCandidatesServer) discoverNetworkService(ctx context.Context, name, payload string) (*registry.NetworkService, error) {
query := &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{
Name: name,
Payload: payload,
},
func (d *discoverCandidatesServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
nseName := conn.GetNetworkServiceEndpointName()
if nseName == "" {
// If it's an existing connection, the NSE name should be set. Otherwise, it's probably an API misuse.
return nil, errors.Errorf("network_service_endpoint_name is not set")
}
nsStream, err := d.nsClient.Find(ctx, query)
u, err := d.urlByNseName(nseName)
if err != nil {
return nil, errors.WithStack(err)
}
nsList := registry.ReadNetworkServiceList(nsStream)
if len(nsList) != 0 {
return nsList[0], nil
return nil, err
}
ctx, cancelFind := context.WithCancel(ctx)
defer cancelFind()
query.Watch = true
nsStream, err = d.nsClient.Find(ctx, query)
return next.Server(ctx).Close(clienturlctx.WithClientURL(ctx, u), conn)
}

func (d *discoverCandidatesServer) urlByNseName(nseName string) (*url.URL, error) {
nseStream, err := d.nseClient.Find(context.Background(), &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: nseName,
},
})
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
select {
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "ns:\"%v\" with payload:\"%v\" is not found", name, payload)
case ns, ok := <-registry.ReadNetworkServiceChannel(nsStream):
if ok {
return ns, nil
}
return nil, errors.New("ns stream is closed")
nseList := registry.ReadNetworkServiceEndpointList(nseStream)
if len(nseList) == 0 {
return nil, errors.Errorf("network service endpoint %s is not found", nseName)
}
u, err := url.Parse(nseList[0].Url)
if err != nil {
return nil, err
}
return u, nil
}

0 comments on commit 9b4f8c2

Please sign in to comment.