Skip to content

Commit

Permalink
hotfix: fix port-forward retry bug (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
wencaiwulue authored Oct 30, 2024
1 parent 6e052a5 commit a243842
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/handler/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
var readyChan = make(chan struct{})
podName := podList[0].GetName()
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
//go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0])
if *first {
go func() {
Expand Down
84 changes: 41 additions & 43 deletions pkg/util/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/cli-runtime/pkg/resource"
Expand Down Expand Up @@ -171,11 +169,20 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
return err
}

if err = forwarder.ForwardPorts(); err != nil {
log.Debugf("Forward port error: %s", err.Error())
defer forwarder.Close()

var errChan = make(chan error, 1)
go func() {
errChan <- forwarder.ForwardPorts()
}()

select {
case err = <-errChan:
log.Debugf("Forward port error: %v", err)
return err
case <-stopChan:
return nil
}
return nil
}

func GetTopOwnerReference(factory util.Factory, ns, workload string) (*resource.Info, error) {
Expand Down Expand Up @@ -330,36 +337,42 @@ func FindContainerByName(pod *corev1.Pod, name string) (*corev1.Container, int)
}

func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName string, podInterface v12.PodInterface) {
var verifyAPIServerConnection = func() {
err := retry.OnError(
retry.DefaultBackoff,
func(err error) bool {
return err != nil
},
func() error {
ctx1, cancelFunc1 := context.WithTimeout(ctx, time.Second*10)
defer cancelFunc1()
_, err := podInterface.Get(ctx1, podName, v1.GetOptions{})
return err
})
if err != nil {
log.Debugf("Failed to get Pod %s: %v", podName, err)
cancelFunc()
}
}

for ctx.Err() == nil {
func() {
defer time.Sleep(time.Millisecond * 200)
defer time.Sleep(time.Second * 5)

w, err := podInterface.Watch(ctx, v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) {
log.Debugf("Failed to watch Pod %s: %v", podName, err)
}
log.Debugf("Failed to watch Pod %s: %v", podName, err)
return
}
defer w.Stop()

_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
if err != nil {
if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) {
log.Debugf("Failed to get Pod %s: %v", podName, err)
}
return
}
verifyAPIServerConnection()
select {
case e, ok := <-w.ResultChan():
if !ok {
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Debugf("Failed to get Pod %s: %v", podName, err)
cancelFunc()
}
verifyAPIServerConnection()
return
}
switch e.Type {
Expand All @@ -368,11 +381,7 @@ func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName
cancelFunc()
return
case watch.Error:
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Debugf("Failed to get Pod %s: %v", podName, err)
cancelFunc()
}
verifyAPIServerConnection()
return
case watch.Added, watch.Modified, watch.Bookmark:
// do nothing
Expand All @@ -397,25 +406,14 @@ func CheckPortStatus(ctx context.Context, cancelFunc context.CancelFunc, readyCh
}

for ctx.Err() == nil {
err := retry.OnError(wait.Backoff{
Steps: 6,
Duration: time.Second,
}, func(err error) bool {
return err != nil
}, func() error {
var lc net.ListenConfig
conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localGvisorTCPPort))
if err == nil {
_ = conn.Close()
return errors.New("port is free")
}
return nil
})
if err != nil {
log.Debugf("Can not dial local port: %s: %v", localGvisorTCPPort, err)
var lc net.ListenConfig
conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localGvisorTCPPort))
if err == nil {
_ = conn.Close()
log.Debugf("Local port: %s is free", localGvisorTCPPort)
return
}
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)
}
}

Expand Down

0 comments on commit a243842

Please sign in to comment.