Skip to content

Commit

Permalink
Merge pull request #49 from glazychev-art/log
Browse files Browse the repository at this point in the history
Update to latest sdk version
  • Loading branch information
haiodo authored Feb 9, 2021
2 parents 646833e + 5c8d1ec commit 3043b50
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.2
github.com/networkservicemesh/api v0.0.0-20210202152048-ec956057eb3a
github.com/networkservicemesh/sdk v0.0.0-20210208092844-64f6aa269f63
github.com/networkservicemesh/sdk v0.0.0-20210209072519-ac14b1d8f2ec
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRf
github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU=
github.com/networkservicemesh/api v0.0.0-20210202152048-ec956057eb3a h1:26MmmU399gnahAC+f6OfWsP5MpnLdJ4UjvBa6fXYvNU=
github.com/networkservicemesh/api v0.0.0-20210202152048-ec956057eb3a/go.mod h1:qvxdY1Zt4QTtiG+uH1XmjpegeHjlt5Jj4A8iK55iJPI=
github.com/networkservicemesh/sdk v0.0.0-20210208092844-64f6aa269f63 h1:fDvXYI18q3+DIN2zuljZXavL7IILWun2zafTa4llUG8=
github.com/networkservicemesh/sdk v0.0.0-20210208092844-64f6aa269f63/go.mod h1:gqv+RfDum4HBeZA0k+F96mT2m0rh9LZTUazjtsLJ0Bs=
github.com/networkservicemesh/sdk v0.0.0-20210209072519-ac14b1d8f2ec h1:DjdZBDjKzXRCE+Bzk1wu20FlA7Cu8J0aPDfkv86/6sQ=
github.com/networkservicemesh/sdk v0.0.0-20210209072519-ac14b1d8f2ec/go.mod h1:gqv+RfDum4HBeZA0k+F96mT2m0rh9LZTUazjtsLJ0Bs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nishanths/exhaustive v0.0.0-20200811152831-6cf413ae40e0/go.mod h1:wBEpHwM2OdmeNpdCvRPUlkEbBuaFmcK4Wv8Q7FuGW3c=
Expand Down
34 changes: 17 additions & 17 deletions pkg/tools/deviceplugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"

"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/logger"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/sdk-k8s/pkg/tools/socketpath"
)
Expand All @@ -57,11 +57,11 @@ func NewClient(devicePluginPath string) *Client {

// StartDeviceServer starts device plugin server and returns the name of the corresponding unix socket
func (c *Client) StartDeviceServer(ctx context.Context, deviceServer pluginapi.DevicePluginServer) (string, error) {
logEntry := logger.Log(ctx).WithField("Client", "StartDeviceServer")
logger := log.FromContext(ctx).WithField("Client", "StartDeviceServer")

socket := uuid.New().String()
socketPath := socketpath.SocketPath(path.Join(c.devicePluginPath, socket))
logEntry.Infof("socket = %v", socket)
logger.Infof("socket = %v", socket)
if err := socketpath.SocketCleanup(socketPath); err != nil {
return "", err
}
Expand All @@ -78,29 +78,29 @@ func (c *Client) StartDeviceServer(ctx context.Context, deviceServer pluginapi.D
}
go func() {
if err := <-errCh; err != nil {
logEntry.Fatalf("error in device plugin server at %s: %s", socket, err.Error())
logger.Fatalf("error in device plugin server at %s: %s", socket, err.Error())
}
}()

dialCtx, cancel := context.WithTimeout(ctx, dialTimeoutDefault)
defer cancel()

logEntry.Info("check device server operational")
logger.Info("check device server operational")
conn, err := grpc.DialContext(dialCtx, socketURL.String(), grpc.WithBlock(), grpc.WithInsecure())
if err != nil {
logEntry.Errorf("failed to dial kubelet api: %s", err.Error())
logger.Errorf("failed to dial kubelet api: %s", err.Error())
return "", err
}
_ = conn.Close()

logEntry.Info("device server is operational")
logger.Info("device server is operational")

return socket, nil
}

// RegisterDeviceServer registers device plugin server using the given request
func (c *Client) RegisterDeviceServer(ctx context.Context, request *pluginapi.RegisterRequest) error {
logEntry := logger.Log(ctx).WithField("Client", "RegisterDeviceServer")
logger := log.FromContext(ctx).WithField("Client", "RegisterDeviceServer")

socketURL := grpcutils.AddressToURL(socketpath.SocketPath(c.devicePluginSocket))
conn, err := grpc.DialContext(ctx, socketURL.String(), grpc.WithInsecure())
Expand All @@ -110,22 +110,22 @@ func (c *Client) RegisterDeviceServer(ctx context.Context, request *pluginapi.Re
defer func() { _ = conn.Close() }()

client := pluginapi.NewRegistrationClient(conn)
logEntry.Info("trying to register to device plugin kubelet service")
logger.Info("trying to register to device plugin kubelet service")
if _, err = client.Register(context.Background(), request); err != nil {
return errors.Wrap(err, "cannot register to device plugin kubelet service")
}
logEntry.Info("register done")
logger.Info("register done")

return nil
}

// MonitorKubeletRestart monitors if kubelet restarts so we need to re register device plugin server
func (c *Client) MonitorKubeletRestart(ctx context.Context) (chan struct{}, error) {
logEntry := logger.Log(ctx).WithField("Client", "MonitorKubeletRestart")
logger := log.FromContext(ctx).WithField("Client", "MonitorKubeletRestart")

watcher, err := watchOn(c.devicePluginPath)
if err != nil {
logEntry.Errorf("failed to watch on %v", c.devicePluginPath)
logger.Errorf("failed to watch on %v", c.devicePluginPath)
return nil, err
}

Expand All @@ -136,23 +136,23 @@ func (c *Client) MonitorKubeletRestart(ctx context.Context) (chan struct{}, erro
for {
select {
case <-ctx.Done():
logEntry.Info("end monitoring")
logger.Info("end monitoring")
return
case event, ok := <-watcher.Events:
if !ok {
logEntry.Info("watcher has been closed")
logger.Info("watcher has been closed")
return
}
if event.Name == c.devicePluginSocket && event.Op&fsnotify.Create == fsnotify.Create {
logEntry.Warn("kubelet restarts")
logger.Warn("kubelet restarts")
monitorCh <- struct{}{}
}
case err, ok := <-watcher.Errors:
if !ok {
logEntry.Info("watcher has been closed")
logger.Info("watcher has been closed")
return
}
logEntry.Warn(err)
logger.Warn(err)
}
}
}()
Expand Down
5 changes: 1 addition & 4 deletions pkg/tools/deviceplugin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"testing"
"time"

"github.com/networkservicemesh/sdk/pkg/tools/logger"

"github.com/stretchr/testify/require"

"github.com/networkservicemesh/sdk-k8s/pkg/tools/deviceplugin"
Expand All @@ -40,15 +38,14 @@ const (
func TestDevicePluginManager_MonitorKubeletRestart(t *testing.T) {
devicePluginPath := path.Join(os.TempDir(), t.Name())
devicePluginSocket := path.Join(devicePluginPath, kubeletSocket)
ctx := logger.WithLog(context.Background())

c := deviceplugin.NewClient(devicePluginPath)

_ = os.RemoveAll(devicePluginPath)
err := os.MkdirAll(devicePluginPath, os.ModeDir|os.ModePerm)
require.NoError(t, err)

monitorCh, err := c.MonitorKubeletRestart(ctx)
monitorCh, err := c.MonitorKubeletRestart(context.Background())
require.NoError(t, err)

_, err = os.Create(devicePluginSocket)
Expand Down
14 changes: 7 additions & 7 deletions pkg/tools/k8stest/deviceplugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"

"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/logger"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/sdk-k8s/pkg/tools/socketpath"
)
Expand All @@ -44,32 +44,32 @@ func StartRegistrationServer(devicePluginPath string, server *grpc.Server) {
}

func (rs *registrationServer) Register(ctx context.Context, request *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
logEntry := logger.Log(ctx).WithField("registrationServer", "Register")
logger := log.FromContext(ctx).WithField("registrationServer", "Register")

socketPath := socketpath.SocketPath(path.Join(rs.devicePluginPath, request.Endpoint))
socketURL := grpcutils.AddressToURL(socketPath)
conn, err := grpc.DialContext(ctx, socketURL.String(), grpc.WithInsecure())
if err != nil {
logEntry.Errorf("failed to connect to %v", socketPath.String())
logger.Errorf("failed to connect to %v", socketPath.String())
return nil, err
}

client := pluginapi.NewDevicePluginClient(conn)
receiver, err := client.ListAndWatch(ctx, &pluginapi.Empty{})
if err != nil {
logEntry.Errorf("failed to ListAndWatch on %v", socketPath.String())
logger.Errorf("failed to ListAndWatch on %v", socketPath.String())
return nil, err
}

go func() {
logEntry.Info("client started")
logger.Info("client started")
for {
response, err := receiver.Recv()
if err != nil {
logEntry.Infof("client closed: %+v", err)
logger.Infof("client closed: %+v", err)
return
}
logEntry.Infof("devices update -> %v", response.Devices)
logger.Infof("devices update -> %v", response.Devices)
}
}()

Expand Down
8 changes: 4 additions & 4 deletions pkg/tools/podresources/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
podresources "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"

"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/logger"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/sdk-k8s/pkg/tools/socketpath"
)
Expand All @@ -51,18 +51,18 @@ func NewClient(podResourcesPath string) *Client {

// GetPodResourcesListerClient returns a new PodResourcesListerClient
func (km *Client) GetPodResourcesListerClient(ctx context.Context) (podresources.PodResourcesListerClient, error) {
logEntry := logger.Log(ctx).WithField("podresources.Client", "GetPodResourcesListerClient")
logger := log.FromContext(ctx).WithField("podresources.Client", "GetPodResourcesListerClient")

socketURL := grpcutils.AddressToURL(socketpath.SocketPath(km.podResourcesSocket))
conn, err := grpc.DialContext(ctx, socketURL.String(), grpc.WithInsecure())
if err != nil {
return nil, errors.Wrap(err, "cannot connect to pod resources kubelet service")
}

logEntry.Info("start pod resources lister client")
logger.Info("start pod resources lister client")
go func() {
<-ctx.Done()
logEntry.Info("close pod resources lister client")
logger.Info("close pod resources lister client")
_ = conn.Close()
}()

Expand Down

0 comments on commit 3043b50

Please sign in to comment.