From 01a239a69319605563ce8b099c96eb82c109d716 Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Tue, 31 Mar 2020 16:31:48 +0300 Subject: [PATCH 1/2] Add portforward package Based on https://github.com/justinbarrick/go-k8s-portforward --- cmd/fluxctl/portforward.go | 3 +- go.mod | 18 --- go.sum | 2 - pkg/portforward/portforward.go | 207 ++++++++++++++++++++++++++++ pkg/portforward/portforward_test.go | 194 ++++++++++++++++++++++++++ 5 files changed, 403 insertions(+), 21 deletions(-) create mode 100644 pkg/portforward/portforward.go create mode 100644 pkg/portforward/portforward_test.go diff --git a/cmd/fluxctl/portforward.go b/cmd/fluxctl/portforward.go index a1d475303..3e8cef2cd 100644 --- a/cmd/fluxctl/portforward.go +++ b/cmd/fluxctl/portforward.go @@ -4,7 +4,8 @@ import ( "fmt" "strings" - portforward "github.com/justinbarrick/go-k8s-portforward" + "github.com/fluxcd/flux/pkg/portforward" + "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" diff --git a/go.mod b/go.mod index 82420d337..ce0a2d2c1 100644 --- a/go.mod +++ b/go.mod @@ -29,38 +29,25 @@ replace ( replace github.com/fluxcd/flux/pkg/install => ./pkg/install require ( - github.com/Azure/go-autorest v11.7.1+incompatible // indirect github.com/Jeffail/gabs v1.4.0 - github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/semver/v3 v3.0.3 - github.com/Masterminds/sprig v2.22.0+incompatible // indirect - github.com/VividCortex/gohistogram v1.0.0 // indirect github.com/aws/aws-sdk-go v1.27.1 github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668 github.com/cheggaaa/pb/v3 v3.0.2 - github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/docker/distribution v2.7.1+incompatible - github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect - github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f // indirect github.com/evanphx/json-patch v4.5.0+incompatible github.com/fluxcd/flux/pkg/install v0.0.0-00010101000000-000000000000 github.com/fluxcd/helm-operator v1.0.0-rc6 - github.com/fluxcd/helm-operator/pkg/install v0.0.0-00010101000000-000000000000 // indirect github.com/ghodss/yaml v1.0.0 github.com/go-kit/kit v0.9.0 - github.com/gogo/googleapis v1.3.1 // indirect - github.com/gogo/status v1.1.0 // indirect github.com/golang/gddo v0.0.0-20190312205958-5a2505f3dbf0 github.com/google/go-containerregistry v0.0.0-20200121192426-b0ae1fc74a66 github.com/google/go-github/v28 v28.1.1 github.com/gorilla/mux v1.7.3 github.com/gorilla/websocket v1.4.0 github.com/imdario/mergo v0.3.8 - github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d - github.com/ncabatoff/go-seq v0.0.0-20180805175032-b08ef85ed833 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9 // indirect - github.com/opentracing/opentracing-go v1.1.0 // indirect github.com/pkg/errors v0.8.1 github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 github.com/prometheus/client_golang v1.2.1 @@ -69,11 +56,8 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.4.0 - github.com/uber/jaeger-client-go v2.21.1+incompatible // indirect - github.com/uber/jaeger-lib v2.2.0+incompatible // indirect github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab - github.com/weaveworks/promrus v1.2.0 // indirect github.com/whilp/git-urls v0.0.0-20160530060445-31bac0d230fa github.com/xeipuuv/gojsonschema v1.1.0 go.mozilla.org/sops/v3 v3.5.0 @@ -81,11 +65,9 @@ require ( golang.org/x/sys v0.0.0-20191028164358-195ce5e7f934 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/yaml.v2 v2.2.8 - helm.sh/helm/v3 v3.0.3 // indirect k8s.io/api v0.17.4 k8s.io/apiextensions-apiserver v0.17.4 k8s.io/apimachinery v0.17.4 k8s.io/client-go v11.0.0+incompatible - k8s.io/helm v2.16.1+incompatible // indirect k8s.io/klog v1.0.0 ) diff --git a/go.sum b/go.sum index ed139ba35..caa6eeb3d 100644 --- a/go.sum +++ b/go.sum @@ -432,8 +432,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/justinbarrick/go-k8s-portforward v1.0.2/go.mod h1:klMOboLnC1/UlkyJnYFjcMcbOtwAcKop+LkIZ4r428o= -github.com/justinbarrick/go-k8s-portforward v1.0.3/go.mod h1:GkvGI25j2iHpJVINl/hZC+sbf9IJ1XkY1MtjSh3Usuk= -github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d h1:xQ/ZtcWCKzWg5QbOhq6RFPvevl+IE580Vm0Vgxuw3xs= github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d/go.mod h1:GkvGI25j2iHpJVINl/hZC+sbf9IJ1XkY1MtjSh3Usuk= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= diff --git a/pkg/portforward/portforward.go b/pkg/portforward/portforward.go new file mode 100644 index 000000000..dc1c970a5 --- /dev/null +++ b/pkg/portforward/portforward.go @@ -0,0 +1,207 @@ +package portforward + +// based on https://github.com/justinbarrick/go-k8s-portforward + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" +) + +// Used for creating a port forward into a Kubernetes pod +// in a Kubernetes cluster. +type PortForward struct { + // The parsed Kubernetes configuration file. + Config *rest.Config + // The initialized Kubernetes client. + Clientset kubernetes.Interface + // The pod name to use, required if Labels is empty. + Name string + // The labels to use to find the pod. + Labels metav1.LabelSelector + // The port on the pod to forward traffic to. + DestinationPort int + // The port that the port forward should listen to, random if not set. + ListenPort int + // The namespace to look for the pod in. + Namespace string + stopChan chan struct{} + readyChan chan struct{} +} + +// Initialize a port forwarder, loads the Kubernetes configuration file and creates the client. +// You do not need to use this function if you have a client to use already - the PortForward +// struct can be created directly. +func NewPortForwarder(namespace string, labels metav1.LabelSelector, port int) (*PortForward, error) { + pf := &PortForward{ + Namespace: namespace, + Labels: labels, + DestinationPort: port, + } + + var err error + pf.Config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), + &clientcmd.ConfigOverrides{}, + ).ClientConfig() + if err != nil { + return pf, errors.Wrap(err, "Could not load kubernetes configuration file") + } + + pf.Clientset, err = kubernetes.NewForConfig(pf.Config) + if err != nil { + return pf, errors.Wrap(err, "Could not create kubernetes client") + } + + return pf, nil +} + +// Start a port forward to a pod - blocks until the tunnel is ready for use. +func (p *PortForward) Start() error { + p.stopChan = make(chan struct{}, 1) + readyChan := make(chan struct{}, 1) + errChan := make(chan error, 1) + + listenPort, err := p.getListenPort() + if err != nil { + return errors.Wrap(err, "Could not find a port to bind to") + } + + dialer, err := p.dialer() + if err != nil { + return errors.Wrap(err, "Could not create a dialer") + } + + ports := []string{ + fmt.Sprintf("%d:%d", listenPort, p.DestinationPort), + } + + discard := ioutil.Discard + pf, err := portforward.New(dialer, ports, p.stopChan, readyChan, discard, discard) + if err != nil { + return errors.Wrap(err, "Could not port forward into pod") + } + + go func() { + errChan <- pf.ForwardPorts() + }() + + select { + case err = <-errChan: + return errors.Wrap(err, "Could not create port forward") + case <-readyChan: + return nil + } + + return nil +} + +// Stop a port forward. +func (p *PortForward) Stop() { + p.stopChan <- struct{}{} +} + +// Returns the port that the port forward should listen on. +// If ListenPort is set, then it returns ListenPort. +// Otherwise, it will call getFreePort() to find an open port. +func (p *PortForward) getListenPort() (int, error) { + var err error + + if p.ListenPort == 0 { + p.ListenPort, err = p.getFreePort() + } + + return p.ListenPort, err +} + +// Get a free port on the system by binding to port 0, checking +// the bound port number, and then closing the socket. +func (p *PortForward) getFreePort() (int, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + + port := listener.Addr().(*net.TCPAddr).Port + err = listener.Close() + if err != nil { + return 0, err + } + + return port, nil +} + +// Create an httpstream.Dialer for use with portforward.New +func (p *PortForward) dialer() (httpstream.Dialer, error) { + pod, err := p.getPodName() + if err != nil { + return nil, errors.Wrap(err, "Could not get pod name") + } + + url := p.Clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(p.Namespace). + Name(pod). + SubResource("portforward").URL() + + transport, upgrader, err := spdy.RoundTripperFor(p.Config) + if err != nil { + return nil, errors.Wrap(err, "Could not create round tripper") + } + + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) + return dialer, nil +} + +// Gets the pod name to port forward to, if Name is set, Name is returned. Otherwise, +// it will call findPodByLabels(). +func (p *PortForward) getPodName() (string, error) { + var err error + if p.Name == "" { + p.Name, err = p.findPodByLabels() + } + return p.Name, err +} + +// Find the name of a pod by label, returns an error if the label returns +// more or less than one pod. +// It searches for the labels specified by labels. +func (p *PortForward) findPodByLabels() (string, error) { + if len(p.Labels.MatchLabels) == 0 && len(p.Labels.MatchExpressions) == 0 { + return "", errors.New("No pod labels specified") + } + + pods, err := p.Clientset.CoreV1().Pods(p.Namespace).List(metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&p.Labels), + FieldSelector: fields.OneTermEqualSelector("status.phase", string(v1.PodRunning)).String(), + }) + + if err != nil { + return "", errors.Wrap(err, "Listing pods in kubernetes") + } + + formatted := metav1.FormatLabelSelector(&p.Labels) + + if len(pods.Items) == 0 { + return "", errors.New(fmt.Sprintf("Could not find running pod for selector: labels \"%s\"", formatted)) + } + + if len(pods.Items) != 1 { + return "", errors.New(fmt.Sprintf("Ambiguous pod: found more than one pod for selector: labels \"%s\"", formatted)) + } + + return pods.Items[0].ObjectMeta.Name, nil +} diff --git a/pkg/portforward/portforward_test.go b/pkg/portforward/portforward_test.go new file mode 100644 index 000000000..619714e21 --- /dev/null +++ b/pkg/portforward/portforward_test.go @@ -0,0 +1,194 @@ +package portforward + +// based on https://github.com/justinbarrick/go-k8s-portforward + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func newPod(name string, labels map[string]string) *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Name: name, + }, + } +} + +func TestFindPodByLabels(t *testing.T) { + pf := PortForward{ + Clientset: fake.NewSimpleClientset( + newPod("mypod1", map[string]string{ + "name": "other", + }), + newPod("mypod2", map[string]string{ + "name": "flux", + }), + newPod("mypod3", map[string]string{})), + Labels: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "flux", + }, + }, + } + + pod, err := pf.findPodByLabels() + assert.Nil(t, err) + assert.Equal(t, "mypod2", pod) +} + +func TestFindPodByLabelsNoneExist(t *testing.T) { + pf := PortForward{ + Clientset: fake.NewSimpleClientset( + newPod("mypod1", map[string]string{ + "name": "other", + })), + Labels: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "flux", + }, + }, + } + + _, err := pf.findPodByLabels() + assert.NotNil(t, err) + assert.Equal(t, "Could not find pod for selector: labels \"name=flux\"", err.Error()) +} + +func TestFindPodByLabelsMultiple(t *testing.T) { + pf := PortForward{ + Clientset: fake.NewSimpleClientset( + newPod("mypod1", map[string]string{ + "name": "flux", + }), + newPod("mypod2", map[string]string{ + "name": "flux", + }), + newPod("mypod3", map[string]string{})), + Labels: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "flux", + }, + }, + } + + _, err := pf.findPodByLabels() + assert.NotNil(t, err) + assert.Equal(t, "Ambiguous pod: found more than one pod for selector: labels \"name=flux\"", err.Error()) +} + +func TestFindPodByLabelsExpression(t *testing.T) { + pf := PortForward{ + Clientset: fake.NewSimpleClientset( + newPod("mypod1", map[string]string{ + "name": "lol", + }), + newPod("mypod2", map[string]string{ + "name": "fluxd", + }), + newPod("mypod3", map[string]string{})), + Labels: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + metav1.LabelSelectorRequirement{ + Key: "name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"flux", "fluxd"}, + }, + }, + }, + } + + pod, err := pf.findPodByLabels() + assert.Nil(t, err) + assert.Equal(t, "mypod2", pod) +} + +func TestFindPodByLabelsExpressionNotFound(t *testing.T) { + pf := PortForward{ + Clientset: fake.NewSimpleClientset( + newPod("mypod1", map[string]string{ + "name": "lol", + }), + newPod("mypod2", map[string]string{ + "name": "lol", + }), + newPod("mypod3", map[string]string{})), + Labels: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + metav1.LabelSelectorRequirement{ + Key: "name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"flux", "fluxd"}, + }, + }, + }, + } + + _, err := pf.findPodByLabels() + assert.NotNil(t, err) + assert.Equal(t, "Could not find pod for selector: labels \"name in (flux,fluxd)\"", err.Error()) +} + +func TestGetPodNameNameSet(t *testing.T) { + pf := PortForward{ + Name: "hello", + } + + pod, err := pf.getPodName() + assert.Nil(t, err) + assert.Equal(t, "hello", pod) +} + +func TestGetPodNameNoNameSet(t *testing.T) { + pf := PortForward{ + Clientset: fake.NewSimpleClientset( + newPod("mypod", map[string]string{ + "name": "flux", + })), + Labels: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "flux", + }, + }, + } + + pod, err := pf.getPodName() + assert.Nil(t, err) + assert.Equal(t, "mypod", pod) + assert.Equal(t, pf.Name, pod) +} + +func TestGetFreePort(t *testing.T) { + pf := PortForward{} + port, err := pf.getFreePort() + assert.Nil(t, err) + assert.NotZero(t, port) +} + +func TestGetListenPort(t *testing.T) { + pf := PortForward{ + ListenPort: 80, + } + + port, err := pf.getListenPort() + assert.Nil(t, err) + assert.Equal(t, 80, port) +} + +func TestGetListenPortRandom(t *testing.T) { + pf := PortForward{} + + port, err := pf.getListenPort() + assert.Nil(t, err) + assert.NotZero(t, port) + assert.Equal(t, pf.ListenPort, port) +} From 9c13e34165aa22d0b3fc0506062e6d765777fdaa Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Tue, 31 Mar 2020 16:45:24 +0300 Subject: [PATCH 2/2] Fix portforward unit tests --- pkg/portforward/portforward.go | 1 + pkg/portforward/portforward_test.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/portforward/portforward.go b/pkg/portforward/portforward.go index dc1c970a5..c752aad41 100644 --- a/pkg/portforward/portforward.go +++ b/pkg/portforward/portforward.go @@ -1,6 +1,7 @@ package portforward // based on https://github.com/justinbarrick/go-k8s-portforward +// licensed under the Apache License 2.0 import ( "fmt" diff --git a/pkg/portforward/portforward_test.go b/pkg/portforward/portforward_test.go index 619714e21..3399fb118 100644 --- a/pkg/portforward/portforward_test.go +++ b/pkg/portforward/portforward_test.go @@ -1,6 +1,7 @@ package portforward // based on https://github.com/justinbarrick/go-k8s-portforward +// licensed under the Apache License 2.0 import ( "testing" @@ -61,7 +62,7 @@ func TestFindPodByLabelsNoneExist(t *testing.T) { _, err := pf.findPodByLabels() assert.NotNil(t, err) - assert.Equal(t, "Could not find pod for selector: labels \"name=flux\"", err.Error()) + assert.Equal(t, "Could not find running pod for selector: labels \"name=flux\"", err.Error()) } func TestFindPodByLabelsMultiple(t *testing.T) { @@ -135,7 +136,7 @@ func TestFindPodByLabelsExpressionNotFound(t *testing.T) { _, err := pf.findPodByLabels() assert.NotNil(t, err) - assert.Equal(t, "Could not find pod for selector: labels \"name in (flux,fluxd)\"", err.Error()) + assert.Equal(t, "Could not find running pod for selector: labels \"name in (flux,fluxd)\"", err.Error()) } func TestGetPodNameNameSet(t *testing.T) {