Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for kubeLoggingWithReconnect #1236

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/onsi/ginkgo/v2 v2.21.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand All @@ -83,6 +84,7 @@ require (
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.27.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
20 changes: 12 additions & 8 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
// It is instantiated in the NewkubeWorker function and available throughout the package.
var KubeAPIWrapperInstance KubeAPIer

var KubeAPIWrapperLock *sync.RWMutex
var KubeAPIWrapperLock sync.Mutex

// ErrPodCompleted is returned when pod has already completed before we could attach.
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
Expand Down Expand Up @@ -294,7 +294,7 @@
}
}

func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
// preferred method for k8s >= 1.23.14
defer streamWait.Done()
var sinceTime time.Time
Expand Down Expand Up @@ -420,7 +420,7 @@
}
}

func (kw *KubeUnit) createPod(env map[string]string) error {
func (kw *KubeUnit) CreatePod(env map[string]string) error {
ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData)
command, err := shlex.Split(ked.Command)
if err != nil {
Expand Down Expand Up @@ -522,7 +522,9 @@
})

// Wait for the pod to be running
KubeAPIWrapperLock.Lock()
fieldSelector := KubeAPIWrapperInstance.OneTermEqualSelector("metadata.name", kw.pod.Name).String()
KubeAPIWrapperLock.Unlock()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
Expand All @@ -544,7 +546,9 @@
}

time.Sleep(2 * time.Second)
KubeAPIWrapperLock.Lock()
ev, err := KubeAPIWrapperInstance.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady())
KubeAPIWrapperLock.Unlock()
if ev == nil || ev.Object == nil {
return fmt.Errorf("did not return an event while watching pod for work unit %s", kw.ID())
}
Expand Down Expand Up @@ -621,7 +625,7 @@
if podName == "" {
// create new pod if ked.PodName is empty
// TODO: add retry logic to make this more resilient to transient errors
if err := kw.createPod(nil); err != nil {
if err := kw.CreatePod(nil); err != nil {
if err != ErrPodCompleted {
errMsg := fmt.Sprintf("Error creating pod: %s", err)
kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet
Expand Down Expand Up @@ -842,7 +846,7 @@
stdoutWithReconnect := ShouldUseReconnect(kw)
if stdoutWithReconnect && stdoutErr == nil {
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with reconnect support")
go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)
go kw.KubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)

Check warning on line 849 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L849

Added line #L849 was not covered by tests
} else {
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with no reconnect support")
go kw.kubeLoggingNoReconnect(&streamWait, stdout, &stdoutErr)
Expand Down Expand Up @@ -1062,7 +1066,7 @@
}()

// Create the pod
err = kw.createPod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort})
err = kw.CreatePod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort})

Check warning on line 1069 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1069

Added line #L1069 was not covered by tests
if err != nil {
errMsg := fmt.Sprintf("Error creating pod: %s", err)
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)
Expand Down Expand Up @@ -1574,11 +1578,11 @@
}
}

KubeAPIWrapperLock = &sync.RWMutex{}
KubeAPIWrapperLock.Lock()
KubeAPIWrapperInstance = KubeAPIWrapper{}
if kawi != nil {
KubeAPIWrapperInstance = kawi
} else {
KubeAPIWrapperInstance = KubeAPIWrapper{}

Check warning on line 1585 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1585

Added line #L1585 was not covered by tests
}
KubeAPIWrapperLock.Unlock()

Expand Down
87 changes: 84 additions & 3 deletions pkg/workceptor/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package workceptor_test

import (
"context"
"io"
"net/http"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand All @@ -20,7 +23,9 @@ import (
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
fakerest "k8s.io/client-go/rest/fake"
"k8s.io/client-go/tools/remotecommand"
)

Expand Down Expand Up @@ -139,7 +144,7 @@ func TestParseTime(t *testing.T) {
}
}

func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, context.Context) {
func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, *gomock.Controller, context.Context) {
ctrl := gomock.NewController(t)
ctx := context.Background()

Expand All @@ -157,7 +162,7 @@ func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workcep
kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"}
ku := kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI)

return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctx
return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctrl, ctx
}

type hasTerm struct {
Expand Down Expand Up @@ -189,7 +194,7 @@ func (e *ex) StreamWithContext(_ context.Context, _ remotecommand.StreamOptions)
}

func TestKubeStart(t *testing.T) {
ku, mockbwu, mockNet, w, mockKubeAPI, ctx := createKubernetesTestSetup(t)
ku, mockbwu, mockNet, w, mockKubeAPI, _, ctx := createKubernetesTestSetup(t)

startTestCases := []struct {
name string
Expand Down Expand Up @@ -423,3 +428,79 @@ func Test_IsCompatibleK8S(t *testing.T) {
})
}
}

func TestKubeLoggingWithReconnect(t *testing.T) {
var stdinErr error
var stdoutErr error
ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctrl, ctx := createKubernetesTestSetup(t)

kw := &workceptor.KubeUnit{
BaseWorkUnitForWorkUnit: mockBaseWorkUnit,
}
tests := []struct {
name string
expectedCalls func()
}{
{
name: "Kube error should be read",
expectedCalls: func() {
mockBaseWorkUnit.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
config := rest.Config{}
mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil)
mockBaseWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes()
clientset := kubernetes.Clientset{}
mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil)
lock := &sync.RWMutex{}
mockBaseWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes()
mockBaseWorkUnit.EXPECT().MonitorLocalStatus().AnyTimes()
mockBaseWorkUnit.EXPECT().UnitDir().Return("TestDir2").AnyTimes()
kubeExtraData := workceptor.KubeExtraData{}
status := workceptor.StatusFileData{ExtraData: &kubeExtraData}
mockBaseWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes()
mockBaseWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes()
mockBaseWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes()
pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test_Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}}
mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
mockBaseWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes()
field := hasTerm{}
mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes()
ev := watch.Event{Object: &pod}
mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes()
mockKubeAPI.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
req := fakerest.RESTClient{
Client: fakerest.CreateHTTPClient(func(request *http.Request) (*http.Response, error) {
resp := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("2024-12-09T00:31:18.823849250Z HI\n kube error")),
}

return resp, nil
}),
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
}
mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes()
logger := logger.NewReceptorLogger("")
mockNetceptor.EXPECT().GetLogger().Return(logger).AnyTimes()
mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes()
exec := ex{}
mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes()
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.expectedCalls()
ku.Start()
kw.CreatePod(nil)
wg := &sync.WaitGroup{}
wg.Add(1)
mockfilesystemer := mock_workceptor.NewMockFileSystemer(ctrl)
mockfilesystemer.EXPECT().OpenFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(&os.File{}, nil)
stdout, _ := workceptor.NewStdoutWriter(mockfilesystemer, "")
mockFileWC := mock_workceptor.NewMockFileWriteCloser(ctrl)
stdout.SetWriter(mockFileWC)
mockFileWC.EXPECT().Write(gomock.AnyOf([]byte("HI\n"), []byte(" kube error\n"))).Return(0, nil).Times(2)
kw.KubeLoggingWithReconnect(wg, stdout, &stdinErr, &stdoutErr)
})
}
}
Loading