Skip to content

Commit

Permalink
feat: add externaldnscontext networkservice chain element (#1295)
Browse files Browse the repository at this point in the history
* add externaldnscontext pkg

Signed-off-by: denis-tingaikin <denis.tingajkin@xored.com>

* fix typos

Signed-off-by: denis-tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin authored May 17, 2022
1 parent 58c8ced commit 1955d61
Show file tree
Hide file tree
Showing 2 changed files with 392 additions and 0 deletions.
155 changes: 155 additions & 0 deletions pkg/networkservice/connectioncontext/externaldnscontext/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package externaldnscontext gets dnscontext from the remote side.
package externaldnscontext

import (
"context"
"errors"
"net"
"net/url"
"sync/atomic"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/dns"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"

"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type externaldnscontextServer struct {
serverLabels map[string]string
clientQueue chan *dns.DNSRequest
configs atomic.Value
}

// NewServer cretes new externaldnscontext networkservice server instance
func NewServer(ctx context.Context, serverLabels map[string]string, remoteURL *url.URL, opts ...grpc.DialOption) networkservice.NetworkServiceServer {
var r = &externaldnscontextServer{
serverLabels: serverLabels,
clientQueue: make(chan *dns.DNSRequest, 100),
}
var logger = log.FromContext(ctx).WithField("externaldnscontextServer", "managePrefixes go-routine")

r.configs.Store((*dns.Configs)(nil))

go func() {
defer close(r.clientQueue)
<-ctx.Done()
}()

go func() {
for ; ctx.Err() == nil; time.Sleep(time.Millisecond * 100) {
cc, err := grpc.DialContext(ctx, grpcutils.URLToTarget(remoteURL), opts...)
if err != nil {
logger.Errorf("cant dial: %v", err.Error())
continue
}
defer func() { _ = cc.Close() }()

c := dns.NewDNSClient(cc)
resp, err := c.FetchConfigs(ctx, new(empty.Empty))

if err != nil {
logger.Errorf("cant fetch configs: %v")
}

r.configs.Store(resp)

stream, err := c.ManageNames(ctx)
if err != nil {
logger.Errorf("cant open stream: %v", err.Error())
continue
}

for req := range r.clientQueue {
err = stream.Send(req)
if err != nil {
logger.Errorf("cant send msg: %v", err.Error())
break
}
_, err = stream.Recv()
if err != nil {
logger.Errorf("cant recv msg: %v", err.Error())
break
}
}
r.configs.Store((*dns.Configs)(nil))
_ = cc.Close()
}
}()

return r
}

func (e *externaldnscontextServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
var configs []*networkservice.DNSConfig

if v, ok := e.configs.Load().(*dns.Configs); ok && v != nil {
configs = v.Configs
} else {
return nil, errors.New("dns service is not ready yet")
}

if request.GetConnection() == nil {
request.Connection = new(networkservice.Connection)
}
if request.GetConnection().GetContext() == nil {
request.GetConnection().Context = new(networkservice.ConnectionContext)
}
if request.GetConnection().GetContext().GetDnsContext() == nil {
request.GetConnection().GetContext().DnsContext = new(networkservice.DNSContext)
}

e.enqueueDNSRequest(dns.Type_ASSIGN, e.serverLabels, request.GetConnection().GetContext().GetIpContext().GetDstIPNets())
e.enqueueDNSRequest(dns.Type_ASSIGN, request.GetConnection().GetLabels(), request.GetConnection().GetContext().GetIpContext().GetSrcIPNets())

request.GetConnection().GetContext().GetDnsContext().Configs = append(request.GetConnection().GetContext().GetDnsContext().Configs, configs...)

resp, err := next.Server(ctx).Request(ctx, request)

if err != nil {
return nil, err
}

return resp, err
}

func (e *externaldnscontextServer) enqueueDNSRequest(t dns.Type, labels map[string]string, ipNets []*net.IPNet) {
var ips []string
for _, ipNet := range ipNets {
ips = append(ips, ipNet.IP.String())
}

select {
case e.clientQueue <- &dns.DNSRequest{Type: t, Ips: ips, Labels: labels}:
default:
}
}

func (e *externaldnscontextServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
e.enqueueDNSRequest(dns.Type_UNASSIGN, e.serverLabels, conn.GetContext().GetIpContext().GetDstIPNets())
e.enqueueDNSRequest(dns.Type_UNASSIGN, conn.GetLabels(), conn.GetContext().GetIpContext().GetSrcIPNets())

return next.Server(ctx).Close(ctx, conn)
}
237 changes: 237 additions & 0 deletions pkg/networkservice/connectioncontext/externaldnscontext/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package externaldnscontext_test

import (
"context"
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/dns"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/retry"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/externaldnscontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
)

func newTestingDNSRegisterServce(ctx context.Context, t *testing.T) *testingDNSRegisterService {
var s = grpc.NewServer()

var r = new(testingDNSRegisterService)
dns.RegisterDNSServer(s, r)

var serverAddr url.URL

require.Len(t, grpcutils.ListenAndServe(ctx, &serverAddr, s), 0)

r.Addr = serverAddr
return r
}

type testingDNSRegisterService struct {
entries sync.Map
Addr url.URL
}

func (e *testingDNSRegisterService) lenEntries() int {
var r int

e.entries.Range(func(key, value interface{}) bool {
r++
return true
})

return r
}

func (e *testingDNSRegisterService) FetchConfigs(ctx context.Context, _ *empty.Empty) (*dns.Configs, error) {
return &dns.Configs{Configs: []*networkservice.DNSConfig{{DnsServerIps: []string{"8.8.8.8"}}}}, nil
}

func (e *testingDNSRegisterService) ManageNames(s dns.DNS_ManageNamesServer) error {
for s.Context().Err() == nil {
r, err := s.Recv()
if err != nil {
return err
}
var values []string

for _, v := range r.Labels {
values = append(values, v)
}

sort.Strings(values)

var name = strings.Join(values, ".")

if r.Type == dns.Type_ASSIGN {
e.entries.Store(name, r.Ips[0])
} else {
e.entries.Delete(name)
}

err = s.Send(&dns.DNSResponse{
Names: []string{name},
})

if err != nil {
return err
}
}

return nil
}

func Test_ExternalDNSContext_EmptyRequest(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

var ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()

var s = newTestingDNSRegisterServce(ctx, t)

var c = retry.NewClient(
adapters.NewServerToClient(
externaldnscontext.NewServer(
ctx,
map[string]string{"app": "vl3", "podName": "nse1"},
&s.Addr, grpc.WithInsecure(),
),
),
retry.WithTryTimeout(time.Second/10),
retry.WithInterval(time.Millisecond*100),
)

_, err := c.Request(ctx, &networkservice.NetworkServiceRequest{})

require.NoError(t, err)
}

func Test_ExternalDNSContext_RequestRefreshClose(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

var ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()

var s = newTestingDNSRegisterServce(ctx, t)

var c = retry.NewClient(
adapters.NewServerToClient(
externaldnscontext.NewServer(
ctx,
map[string]string{"app": "vl3", "podName": "nse1"},
&s.Addr, grpc.WithInsecure(),
),
),
retry.WithTryTimeout(time.Second/10),
retry.WithInterval(time.Millisecond*100),
)

req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: t.Name(),
Context: &networkservice.ConnectionContext{
IpContext: &networkservice.IPContext{
SrcIpAddrs: []string{"1.1.1.1/32"},
DstIpAddrs: []string{"1.1.1.2/32"},
},
},
},
}

_, err := c.Request(ctx, req)

require.NoError(t, err)

require.Eventually(t, func() bool {
return s.lenEntries() == 2
}, time.Second/2, time.Second/10)

resp, err := c.Request(ctx, req)

require.NoError(t, err)

require.Eventually(t, func() bool {
return s.lenEntries() == 2
}, time.Second/2, time.Second/10)

_, err = c.Close(ctx, resp)
require.NoError(t, err)

require.Eventually(t, func() bool {
return s.lenEntries() == 0
}, time.Second/2, time.Second/10)
}
func Test_ExternalDNSContext_WorksCorrectly_WithDNSContext(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

var ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()

var s = newTestingDNSRegisterServce(ctx, t)

var c = retry.NewClient(
adapters.NewServerToClient(
chain.NewNetworkServiceServer(
externaldnscontext.NewServer(
ctx,
map[string]string{"app": "vl3", "podName": "nse1"},
&s.Addr, grpc.WithInsecure(),
),
dnscontext.NewServer(&networkservice.DNSConfig{DnsServerIps: []string{"8.8.4.4"}}),
),
),
retry.WithTryTimeout(time.Second/10),
retry.WithInterval(time.Millisecond*100),
)

req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: t.Name(),
Context: &networkservice.ConnectionContext{
IpContext: &networkservice.IPContext{
SrcIpAddrs: []string{"1.1.1.1/32"},
DstIpAddrs: []string{"1.1.1.2/32"},
},
},
},
}

resp, err := c.Request(ctx, req)

require.NoError(t, err)

var ips []string

for _, cfg := range resp.GetContext().GetDnsContext().GetConfigs() {
ips = append(ips, cfg.DnsServerIps...)
}

require.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, ips)
}

0 comments on commit 1955d61

Please sign in to comment.