Skip to content

Commit

Permalink
Added fix for RPC port detection (#5715)
Browse files Browse the repository at this point in the history
Co-authored-by: Brian de Alwis <bdealwis@google.com>
  • Loading branch information
dat-boris and briandealwis authored Apr 27, 2021
1 parent 7d1db11 commit 7aaa1c7
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn
}
pfe.terminationLock.Unlock()

if !isPortFree(pfe.localPort) {
if !isPortFree(util.Loopback, pfe.localPort) {
// Assuming that Skaffold brokered ports don't overlap, this has to be an external process that started
// since the dev loop kicked off. We are notifying the user in the hope that they can fix it
color.Red.Fprintf(k.out, "failed to port forward %v, port %d is taken, retrying...\n", pfe, pfe.localPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestUnavailablePort(t *testing.T) {
// has been called
var portFreeWG sync.WaitGroup
portFreeWG.Add(1)
t.Override(&isPortFree, func(int) bool {
t.Override(&isPortFree, func(string, int) bool {
portFreeWG.Done()
return false
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *WatchingPodForwarder) podForwardingEntry(resourceVersion, containerName
}

// retrieve an open port on the host
entry.localPort = retrieveAvailablePort(resource.Port.IntVal, &p.entryManager.forwardedPorts)
entry.localPort = retrieveAvailablePort(resource.Address, resource.Port.IntVal, &p.entryManager.forwardedPorts)

return entry, nil
}
3 changes: 2 additions & 1 deletion pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
schemautil "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/testutil"
testEvent "github.com/GoogleContainerTools/skaffold/testutil/event"
)
Expand Down Expand Up @@ -403,7 +404,7 @@ func TestAutomaticPortForwardPod(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
testEvent.InitializeState([]latest.Pipeline{{}})
taken := map[int]struct{}{}
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(taken, test.availablePorts))
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(util.Loopback, taken, test.availablePorts))
t.Override(&topLevelOwnerKey, func(context.Context, metav1.Object, string) string { return "owner" })

if test.forwarder == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
schemautil "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
)

// SimulateDevCycle is used for testing a port forward + stop + restart in a simulated dev cycle
Expand All @@ -37,7 +38,7 @@ func SimulateDevCycle(t *testing.T, kubectlCLI *kubectl.CLI, namespace string) {
defer func() { portForwardEvent = portForwardEventHandler }()
portForwardEvent = func(entry *portForwardEntry) {}
ctx := context.Background()
localPort := retrieveAvailablePort(9000, &em.forwardedPorts)
localPort := retrieveAvailablePort(util.Loopback, 9000, &em.forwardedPorts)
pfe := newPortForwardEntry(0, latest.PortForwardResource{
Type: "deployment",
Name: "leeroy-web",
Expand All @@ -50,7 +51,7 @@ func SimulateDevCycle(t *testing.T, kubectlCLI *kubectl.CLI, namespace string) {

logrus.Info("waiting for the same port to become available...")
if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
nextPort := retrieveAvailablePort(localPort, &em.forwardedPorts)
nextPort := retrieveAvailablePort(util.Loopback, localPort, &em.forwardedPorts)

logrus.Infof("next port %d", nextPort)

Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/resource_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *ResourceForwarder) getCurrentEntry(resource latest.PortForwardResource)
if requestPort == 0 && resource.Port.IntVal >= 1024 {
requestPort = resource.Port.IntVal
}
entry.localPort = retrieveAvailablePort(requestPort, &p.entryManager.forwardedPorts)
entry.localPort = retrieveAvailablePort(resource.Address, requestPort, &p.entryManager.forwardedPorts)
return entry
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func newTestForwarder() *testForwarder {
return &testForwarder{}
}

func mockRetrieveAvailablePort(taken map[int]struct{}, availablePorts []int) func(int, *util.PortSet) int {
func mockRetrieveAvailablePort(_ string, taken map[int]struct{}, availablePorts []int) func(string, int, *util.PortSet) int {
// Return first available port in ports that isn't taken
var lock sync.Mutex
return func(int, *util.PortSet) int {
return func(string, int, *util.PortSet) int {
for _, p := range availablePorts {
lock.Lock()
if _, ok := taken[p]; ok {
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestStart(t *testing.T) {
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
testEvent.InitializeState([]latest.Pipeline{{}})
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(map[int]struct{}{}, test.availablePorts))
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(util.Loopback, map[int]struct{}{}, test.availablePorts))
t.Override(&retrieveServices, func(context.Context, string, []string) ([]*latest.PortForwardResource, error) {
return test.resources, nil
})
Expand Down Expand Up @@ -201,9 +201,9 @@ func TestGetCurrentEntryFunc(t *testing.T) {

for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&retrieveAvailablePort, func(req int, ps *util.PortSet) int {
t.Override(&retrieveAvailablePort, func(addr string, req int, ps *util.PortSet) int {
t.CheckDeepEqual(test.expectedReq, req)
return mockRetrieveAvailablePort(map[int]struct{}{}, test.availablePorts)(req, ps)
return mockRetrieveAvailablePort(util.Loopback, map[int]struct{}{}, test.availablePorts)(addr, req, ps)
})

entryManager := NewEntryManager(ioutil.Discard, newTestForwarder())
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestUserDefinedResources(t *testing.T) {
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
testEvent.InitializeState([]latest.Pipeline{{}})
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(map[int]struct{}{}, []int{8080, 9000}))
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(util.Loopback, map[int]struct{}{}, []int{8080, 9000}))
t.Override(&retrieveServices, func(context.Context, string, []string) ([]*latest.PortForwardResource, error) {
return []*latest.PortForwardResource{svc}, nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func errorHandler(ctx context.Context, _ *runtime.ServeMux, marshaler runtime.Ma

func listenOnAvailablePort(preferredPort int, usedPorts *util.PortSet) (net.Listener, int, error) {
for try := 1; ; try++ {
port := util.GetAvailablePort(preferredPort, usedPorts)
port := util.GetAvailablePort(util.Loopback, preferredPort, usedPorts)

l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
Expand Down
35 changes: 24 additions & 11 deletions pkg/skaffold/util/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// unless we really want to expose something to the network.
const Loopback = "127.0.0.1"

// Network address which represent any address. This is the default that
// we should use when checking if port is free.
const Any = ""

type PortSet struct {
ports map[int]bool
lock sync.Mutex
Expand Down Expand Up @@ -88,36 +92,36 @@ func (f *PortSet) List() []int {
}

// GetAvailablePort returns an available port that is near the requested port when possible.
// First, check if the provided port is available on the specified address. If so, use it.
// First, check if the provided port is available on the specified address and INADDR_ANY. If so, use it.
// If not, check if any of the next 10 subsequent ports are available.
// If not, check if any of ports 4503-4533 are available.
// If not, return a random port, which hopefully won't collide with any future containers
//
// See https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt
func GetAvailablePort(port int, usedPorts *PortSet) int {
func GetAvailablePort(address string, port int, usedPorts *PortSet) int {
if port > 0 {
if getPortIfAvailable(port, usedPorts) {
if getPortIfAvailable(address, port, usedPorts) {
return port
}

// try the next 10 ports after the provided one
for i := 0; i < 10; i++ {
port++
if getPortIfAvailable(port, usedPorts) {
if getPortIfAvailable(address, port, usedPorts) {
logrus.Debugf("found open port: %d", port)
return port
}
}
}

for port = 4503; port <= 4533; port++ {
if getPortIfAvailable(port, usedPorts) {
if getPortIfAvailable(address, port, usedPorts) {
logrus.Debugf("found open port: %d", port)
return port
}
}

l, err := net.Listen("tcp", ":0")
l, err := net.Listen("tcp", fmt.Sprintf("%s:0", address))
if err != nil {
return -1
}
Expand All @@ -129,20 +133,29 @@ func GetAvailablePort(port int, usedPorts *PortSet) int {
return p
}

func getPortIfAvailable(p int, usedPorts *PortSet) bool {
func getPortIfAvailable(address string, p int, usedPorts *PortSet) bool {
if alreadySet := usedPorts.LoadOrSet(p); alreadySet {
return false
}

return IsPortFree(p)
return IsPortFree(address, p)
}

func IsPortFree(p int) bool {
func IsPortFree(address string, p int) bool {
// Ensure the port is available across all interfaces
l, err := net.Listen("tcp", fmt.Sprintf(":%d", p))
if err != nil {
if err != nil || l == nil {
return false
}

l.Close()

if address != Any {
// Ensure the port is available on the specific interface too
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, p))
if err != nil || l == nil {
return false
}
l.Close()
}
return true
}
2 changes: 1 addition & 1 deletion pkg/skaffold/util/port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestGetAvailablePort(t *testing.T) {
wg.Add(N)
for i := 0; i < N; i++ {
go func() {
port := GetAvailablePort(4503, &ports)
port := GetAvailablePort(Loopback, 4503, &ports)

l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", Loopback, port))
if err != nil {
Expand Down

0 comments on commit 7aaa1c7

Please sign in to comment.