Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract sockets #470

Merged
merged 6 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ jobs:
- name: Build
run: go build -race ./...
- name: Test
if: matrix.os != 'windows-latest'
run: sudo -E PATH="$PATH" bash -c "go test -race ./..."
- name: Test
if: matrix.os == 'windows-latest'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why this change?

Copy link
Contributor Author

@Mixaster995 Mixaster995 Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test https://github.com/Mixaster995/sdk-vpp/blob/abstract-sockets/pkg/tools/proxy/proxy_test.go#L44
failes without these lines
The reason is creation of a new network namespace throws operation not permitted on linux, so we have to use sudo, and this construction in ci passes correct envs to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it :)

run: go test -race ./...
golangci-lint:
name: golangci-lint
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ require (
github.com/stretchr/testify v1.7.0
github.com/thanhpk/randstr v1.0.4
github.com/vishvananda/netlink v1.1.0
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae
go.uber.org/goleak v1.1.10
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.25.0
Expand Down
10 changes: 8 additions & 2 deletions pkg/networkservice/chains/forwarder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
tag.NewServer(ctx, vppConn),
mtu.NewServer(vppConn),
mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{
memif.MECHANISM: memif.NewServer(vppConn, memif.WithDirectMemif()),
memif.MECHANISM: memif.NewServer(ctx, vppConn,
memif.WithDirectMemif(),
memif.WithChangeNetNS(),
memif.WithExternalVPP()),
kernel.MECHANISM: kernel.NewServer(vppConn),
vxlan.MECHANISM: vxlan.NewServer(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)),
wireguard.MECHANISM: wireguard.NewServer(vppConn, tunnelIP),
Expand All @@ -113,7 +116,10 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
mtu.NewClient(vppConn),
tag.NewClient(ctx, vppConn),
// mechanisms
memif.NewClient(vppConn),
memif.NewClient(vppConn,
memif.WithChangeNetNS(),
memif.WithExternalVPP(),
),
kernel.NewClient(vppConn),
vxlan.NewClient(vppConn, tunnelIP, vxlan.WithVniPort(tunnelPort)),
wireguard.NewClient(vppConn, tunnelIP),
Expand Down
61 changes: 31 additions & 30 deletions pkg/networkservice/mechanisms/memif/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows
//+build linux

package memif

Expand All @@ -28,39 +28,38 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/memif/memifproxy"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/switchcase"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"

"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/memif/memifproxy"
)

type memifClient struct {
vppConn api.Connection
vppConn *vppConnection
changeNetNs bool
nsInfo NetNSInfo
}

// NewClient provides a NetworkServiceClient chain elements that support the memif Mechanism
func NewClient(vppConn api.Connection) networkservice.NetworkServiceClient {
m := &memifClient{
vppConn: vppConn,
func NewClient(vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient {
opts := &memifOptions{}
for _, o := range options {
o(opts)
}

return chain.NewNetworkServiceClient(
m,
switchcase.NewClient(&switchcase.ClientCase{
Condition: func(ctx context.Context, conn *networkservice.Connection) bool {
_, ok := loadDirectMemifInfo(ctx)
return !ok
&memifClient{
vppConn: &vppConnection{
isExternal: opts.isVPPExternal,
Connection: vppConn,
},
Client: memifproxy.New(),
}),
changeNetNs: opts.changeNetNS,
nsInfo: newNetNSInfo(),
},
)
}

Expand All @@ -75,11 +74,11 @@ func mechanismsContain(list []*networkservice.Mechanism, t string) bool {

func (m *memifClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
if !mechanismsContain(request.MechanismPreferences, memif.MECHANISM) {
request.MechanismPreferences = append(request.MechanismPreferences, &networkservice.Mechanism{
Cls: cls.LOCAL,
Type: memif.MECHANISM,
Parameters: make(map[string]string),
})
mechanism := memif.ToMechanism(memif.NewAbstract(m.nsInfo.netNSPath))
if m.changeNetNs {
mechanism.SetNetNSURL("")
}
request.MechanismPreferences = append(request.MechanismPreferences, mechanism.Mechanism)
}

postponeCtxFunc := postpone.ContextWithValues(ctx)
Expand All @@ -89,14 +88,16 @@ func (m *memifClient) Request(ctx context.Context, request *networkservice.Netwo
return nil, err
}

// if direct memif enabled save socket filename to metadata
_, ok := loadDirectMemifInfo(ctx)
if mechanism := memif.ToMechanism(conn.GetMechanism()); mechanism != nil && ok {
storeDirectMemifInfo(ctx, directMemifInfo{socketURL: mechanism.GetSocketFileURL()})
return conn, nil
// If direct memif case store mechanism to metadata and return.
if info, ok := memifproxy.LoadInfo(ctx); ok {
if mechanism := memif.ToMechanism(conn.GetMechanism()); mechanism != nil && ok {
info.NSURL = mechanism.GetNetNSURL()
info.SocketFile = mechanism.GetSocketFilename()
return conn, nil
}
}

if err := create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil {
if err = create(ctx, conn, m.vppConn, metadata.IsClient(m), m.nsInfo.netNS); err != nil {
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()

Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/mechanisms/memif/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows
// +build linux

package memif_test

Expand Down
123 changes: 93 additions & 30 deletions pkg/networkservice/mechanisms/memif/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//+build linux

package memif

import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sync/atomic"
"runtime"
"time"

"git.fd.io/govpp.git/api"
Expand All @@ -33,60 +36,103 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice/payload"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/pkg/errors"
"github.com/vishvananda/netns"
"golang.org/x/sys/unix"

"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/up"
"github.com/networkservicemesh/sdk-vpp/pkg/tools/ifindex"
)

var lastSocketID uint32
type vppConnection struct {
isExternal bool

api.Connection
}

// NetNSInfo contains shared info for server and client
type NetNSInfo struct {
netNS netns.NsHandle
netNSPath string
}

// NewNetNSInfo should be called only once for single chain
func newNetNSInfo() NetNSInfo {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn api.Connection, isClient bool) (socketID uint32, err error) {
// Extract the socket filename
u, err := url.Parse(mechanism.GetSocketFileURL())
fd, err := unix.Open("/proc/thread-self/ns/net", unix.O_RDONLY|unix.O_CLOEXEC, 0)
if err != nil {
return 0, errors.Wrapf(err, "not a valid url %q", mechanism.GetSocketFileURL())
panic("failed to open '/proc/thread-self/ns/net': " + err.Error())
}
if u.Scheme != memifMech.FileScheme {
return 0, errors.Errorf("socket file url must have scheme %q, actual %q", memifMech.FileScheme, u.Scheme)
netNSPath := fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd)

netNS, err := netns.GetFromPath(netNSPath)
if err != nil {
panic("failed to get current net NS: " + err.Error())
}

// Create the socketID
socketID = atomic.AddUint32(&lastSocketID, 1) // TODO - work out a solution that works long term
now := time.Now()
memifSocketAddDel := &memif.MemifSocketFilenameAddDel{
return NetNSInfo{
netNSPath: netNSPath,
netNS: netNS,
}
}

func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn *vppConnection, isClient bool, netNS netns.NsHandle) (socketID uint32, err error) {
namespace, err := getNamespace(mechanism, vppConn, netNS)
if err != nil {
return 0, err
}

memifSocketAddDel := &memif.MemifSocketFilenameAddDelV2{
IsAdd: true,
SocketID: socketID,
SocketFilename: u.Path,
SocketID: ^uint32(0),
SocketFilename: mechanism.GetSocketFilename(),
Namespace: namespace,
}
if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDel(ctx, memifSocketAddDel); err != nil {

now := time.Now()

reply, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDelV2(ctx, memifSocketAddDel)
if err != nil {
return 0, errors.WithStack(err)
}
memifSocketAddDel.SocketID = reply.SocketID

log.FromContext(ctx).
WithField("SocketID", memifSocketAddDel.SocketID).
WithField("SocketFilename", memifSocketAddDel.SocketFilename).
WithField("SocketNamespace", memifSocketAddDel.Namespace).
WithField("IsAdd", memifSocketAddDel.IsAdd).
WithField("duration", time.Since(now)).
WithField("vppapi", "MemifSocketFilenameAddDel").Debug("completed")

store(ctx, isClient, memifSocketAddDel)
return socketID, nil

return memifSocketAddDel.SocketID, nil
}

func deleteMemifSocket(ctx context.Context, vppConn api.Connection, isClient bool) error {
memifSocketAddDel, ok := load(ctx, isClient)
if !ok {
return nil
}

memifSocketAddDel.IsAdd = false

now := time.Now()
if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDel(ctx, memifSocketAddDel); err != nil {

if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDelV2(ctx, memifSocketAddDel); err != nil {
return errors.WithStack(err)
}

log.FromContext(ctx).
WithField("SocketID", memifSocketAddDel.SocketID).
WithField("SocketFilename", memifSocketAddDel.SocketFilename).
WithField("SocketNamespace", memifSocketAddDel.Namespace).
WithField("IsAdd", memifSocketAddDel.IsAdd).
WithField("duration", time.Since(now)).
WithField("vppapi", "MemifSocketFilenameAddDel").Debug("completed")

return nil
}

Expand Down Expand Up @@ -133,7 +179,7 @@ func createMemif(ctx context.Context, vppConn api.Connection, socketID uint32, m
}

func deleteMemif(ctx context.Context, vppConn api.Connection, isClient bool) error {
swIfIndex, ok := ifindex.LoadAndDelete(ctx, isClient)
swIfIndex, ok := ifindex.Load(ctx, isClient)
if !ok {
return nil
}
Expand All @@ -152,23 +198,20 @@ func deleteMemif(ctx context.Context, vppConn api.Connection, isClient bool) err
return nil
}

func create(ctx context.Context, conn *networkservice.Connection, vppConn api.Connection, isClient bool) error {
func create(ctx context.Context, conn *networkservice.Connection, vppConn *vppConnection, isClient bool, netNS netns.NsHandle) error {
if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil {
// This connection has already been created
if _, ok := ifindex.Load(ctx, isClient); ok {
return nil
}
if !isClient {
if err := os.MkdirAll(filepath.Dir(socketFile(conn)), 0700); err != nil {
return errors.Wrapf(err, "failed to create memif socket directory %s", socketFile(conn))
}
mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.FileScheme, Path: socketFile(conn)}).String())
mechanism.SetSocketFilename(socketFile(conn))
}
mode := memif.MEMIF_MODE_API_IP
if conn.GetPayload() == payload.Ethernet {
mode = memif.MEMIF_MODE_API_ETHERNET
}
socketID, err := createMemifSocket(ctx, mechanism, vppConn, isClient)
socketID, err := createMemifSocket(ctx, mechanism, vppConn, isClient, netNS)
if err != nil {
return err
}
Expand All @@ -187,15 +230,35 @@ func del(ctx context.Context, conn *networkservice.Connection, vppConn api.Conne
if err := deleteMemifSocket(ctx, vppConn, isClient); err != nil {
return err
}
if !isClient {
if err := os.RemoveAll(filepath.Dir(socketFile(conn))); err != nil {
return errors.Wrapf(err, "failed to delete %s", filepath.Dir(socketFile(conn)))
}
}
}
return nil
}

func socketFile(conn *networkservice.Connection) string {
return filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket")
return "@" + filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket")
}

func getNamespace(mechanism *memifMech.Mechanism, vppConn *vppConnection, netNS netns.NsHandle) (string, error) {
u, err := url.Parse(mechanism.GetNetNSURL())
if err != nil {
return "", errors.Wrapf(err, "not a valid url %s", mechanism.GetNetNSURL())
}
if u.Scheme != memifMech.FileScheme {
return "", errors.Errorf("socket file url must have scheme %s, actual %s", memifMech.FileScheme, u.Scheme)
}

if vppConn.isExternal {
return u.Path, nil
}

targetNetNS, err := netns.GetFromPath(u.Path)
if err != nil {
return "", err
}
defer func() { _ = targetNetNS.Close() }()

if targetNetNS.Equal(netNS) {
return "", nil
}
return u.Path, nil
}
Loading