From 0e1affb1af7c5e354e0a5acf33bc10876e0af5ea Mon Sep 17 00:00:00 2001 From: matoval Date: Thu, 5 Dec 2024 21:06:15 -0800 Subject: [PATCH 1/6] Add test for kubeLoggingWithReconnect --- go.mod | 2 + pkg/workceptor/kubernetes.go | 10 ++--- pkg/workceptor/kubernetes_test.go | 62 +++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index d39050b09..8b2636254 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/onsi/ginkgo/v2 v2.21.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -83,6 +84,7 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.27.0 // indirect google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 3e673c110..3a3f91bda 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -294,7 +294,7 @@ func (kw *KubeUnit) kubeLoggingNoReconnect(streamWait *sync.WaitGroup, stdout *S } } -func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) { +func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) { // preferred method for k8s >= 1.23.14 defer streamWait.Done() var sinceTime time.Time @@ -420,7 +420,7 @@ func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout } } -func (kw *KubeUnit) createPod(env map[string]string) error { +func (kw *KubeUnit) CreatePod(env map[string]string) error { ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData) command, err := shlex.Split(ked.Command) if err != nil { @@ -621,7 +621,7 @@ func (kw *KubeUnit) runWorkUsingLogger() { if podName == "" { // create new pod if ked.PodName is empty // TODO: add retry logic to make this more resilient to transient errors - if err := kw.createPod(nil); err != nil { + if err := kw.CreatePod(nil); err != nil { if err != ErrPodCompleted { errMsg := fmt.Sprintf("Error creating pod: %s", err) kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet @@ -842,7 +842,7 @@ func (kw *KubeUnit) runWorkUsingLogger() { stdoutWithReconnect := ShouldUseReconnect(kw) if stdoutWithReconnect && stdoutErr == nil { kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with reconnect support") - go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr) + go kw.KubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr) } else { kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with no reconnect support") go kw.kubeLoggingNoReconnect(&streamWait, stdout, &stdoutErr) @@ -1062,7 +1062,7 @@ func (kw *KubeUnit) runWorkUsingTCP() { }() // Create the pod - err = kw.createPod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort}) + err = kw.CreatePod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort}) if err != nil { errMsg := fmt.Sprintf("Error creating pod: %s", err) kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index 254acb045..cb7a017a1 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" ) @@ -423,3 +424,64 @@ func Test_IsCompatibleK8S(t *testing.T) { }) } } + +func TestKubeLoggingWithReconnect(t *testing.T) { + var stdinErr error + var stdoutErr error + ctx := context.Background() + ctrl := gomock.NewController(t) + mockBaseWorkUnitForWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl) + mockBaseWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl) + mockNetceptor := mock_workceptor.NewMockNetceptorForWorkceptor(ctrl) + mockNetceptor.EXPECT().NodeID().Return("NodeID") + mockKubeAPI := mock_workceptor.NewMockKubeAPIer(ctrl) + + kw := &workceptor.KubeUnit{ + BaseWorkUnitForWorkUnit: mockBaseWorkUnitForWorkUnit, + } + + w, err := workceptor.New(ctx, mockNetceptor, "/tmp") + if err != nil { + t.Errorf("Error while creating Workceptor: %v", err) + } + + mockBaseWorkUnit.EXPECT().Init(w, "", "", workceptor.FileSystem{}, nil) + kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"} + kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI) + + tests := []struct { + name string + expectedCalls func() + }{ + { + name: "Kube error should be read", + expectedCalls: func() { + lock := &sync.RWMutex{} + mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes() + kubeExtraData := workceptor.KubeExtraData{} + status := workceptor.StatusFileData{ExtraData: &kubeExtraData} + mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes() + mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes() + mockBaseWorkUnitForWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes() + pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}} + mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() + mockBaseWorkUnitForWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes() + field := hasTerm{} + mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes() + ev := watch.Event{Object: &pod} + mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes() + mockKubeAPI.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() + client := fake.NewSimpleClientset() + req := client.CoreV1().Pods("default").GetLogs("pod-1", &corev1.PodLogOptions{}) + mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.expectedCalls() + kw.CreatePod(nil) + kw.KubeLoggingWithReconnect(&sync.WaitGroup{}, &workceptor.STDoutWriter{}, &stdinErr, &stdoutErr) + }) + } +} From e8e552023d6886e1d2a5155b3dbbac9b59b99ec0 Mon Sep 17 00:00:00 2001 From: matoval Date: Sun, 8 Dec 2024 17:14:35 -0800 Subject: [PATCH 2/6] Test passes with correct values --- pkg/workceptor/kubernetes_test.go | 38 ++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index cb7a017a1..c5a2ffa8c 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -2,8 +2,12 @@ package workceptor_test import ( "context" + "fmt" + "io" + "net/http" "os" "reflect" + "strings" "sync" "testing" "time" @@ -20,9 +24,10 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" + fakerest "k8s.io/client-go/rest/fake" "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/kubernetes/scheme" ) func startNetceptorNodeWithWorkceptor() (*workceptor.KubeUnit, error) { @@ -463,7 +468,7 @@ func TestKubeLoggingWithReconnect(t *testing.T) { mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes() mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes() mockBaseWorkUnitForWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes() - pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}} + pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test_Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}} mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() mockBaseWorkUnitForWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes() field := hasTerm{} @@ -471,9 +476,22 @@ func TestKubeLoggingWithReconnect(t *testing.T) { ev := watch.Event{Object: &pod} mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes() mockKubeAPI.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() - client := fake.NewSimpleClientset() - req := client.CoreV1().Pods("default").GetLogs("pod-1", &corev1.PodLogOptions{}) - mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req) + req := fakerest.RESTClient{ + Client: fakerest.CreateHTTPClient(func(request *http.Request) (*http.Response, error) { + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("2024-12-09T00:31:18.823849250Z HI\n kube error")), + } + return resp, nil + }), + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + GroupVersion: pod.GroupVersionKind().GroupVersion(), + VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", pod.Namespace, pod.Name), + } + mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() + mockBaseWorkUnitForWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes() + logger := logger.NewReceptorLogger("") + mockNetceptor.EXPECT().GetLogger().Return(logger).AnyTimes() }, }, } @@ -481,7 +499,15 @@ func TestKubeLoggingWithReconnect(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tt.expectedCalls() kw.CreatePod(nil) - kw.KubeLoggingWithReconnect(&sync.WaitGroup{}, &workceptor.STDoutWriter{}, &stdinErr, &stdoutErr) + wg := &sync.WaitGroup{} + wg.Add(1) + mockfilesystemer := mock_workceptor.NewMockFileSystemer(ctrl) + mockfilesystemer.EXPECT().OpenFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(&os.File{}, nil) + stdout, _ := workceptor.NewStdoutWriter(mockfilesystemer, "") + mockFileWC := mock_workceptor.NewMockFileWriteCloser(ctrl) + stdout.SetWriter(mockFileWC) + mockFileWC.EXPECT().Write(gomock.AnyOf([]byte("HI\n"), []byte(" kube error\n"))).Return(0, nil).Times(2) + kw.KubeLoggingWithReconnect(wg, stdout, &stdinErr, &stdoutErr) }) } } From 9358443e373c0f3e9a2fbd3a59b8ae47e292d2f8 Mon Sep 17 00:00:00 2001 From: matoval Date: Sun, 8 Dec 2024 17:27:36 -0800 Subject: [PATCH 3/6] Run golangci-lint --- pkg/workceptor/kubernetes_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index c5a2ffa8c..bb23bdf27 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -24,10 +24,10 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" fakerest "k8s.io/client-go/rest/fake" "k8s.io/client-go/tools/remotecommand" - "k8s.io/client-go/kubernetes/scheme" ) func startNetceptorNodeWithWorkceptor() (*workceptor.KubeUnit, error) { @@ -455,11 +455,11 @@ func TestKubeLoggingWithReconnect(t *testing.T) { kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI) tests := []struct { - name string + name string expectedCalls func() }{ { - name: "Kube error should be read", + name: "Kube error should be read", expectedCalls: func() { lock := &sync.RWMutex{} mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes() @@ -482,6 +482,7 @@ func TestKubeLoggingWithReconnect(t *testing.T) { StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader("2024-12-09T00:31:18.823849250Z HI\n kube error")), } + return resp, nil }), NegotiatedSerializer: scheme.Codecs.WithoutConversion(), From 9144bb51cf70dce7d8d62a786b0f92f826207887 Mon Sep 17 00:00:00 2001 From: matoval Date: Sun, 8 Dec 2024 18:25:16 -0800 Subject: [PATCH 4/6] Fix test --- pkg/workceptor/kubernetes_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index bb23bdf27..9c120df33 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -2,7 +2,6 @@ package workceptor_test import ( "context" - "fmt" "io" "net/http" "os" @@ -486,13 +485,14 @@ func TestKubeLoggingWithReconnect(t *testing.T) { return resp, nil }), NegotiatedSerializer: scheme.Codecs.WithoutConversion(), - GroupVersion: pod.GroupVersionKind().GroupVersion(), - VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", pod.Namespace, pod.Name), } mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() mockBaseWorkUnitForWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes() logger := logger.NewReceptorLogger("") mockNetceptor.EXPECT().GetLogger().Return(logger).AnyTimes() + mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() + exec := ex{} + mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes() }, }, } From e14eb385d26a2948649ca4911da4dfe28680fd3e Mon Sep 17 00:00:00 2001 From: matoval Date: Mon, 9 Dec 2024 12:59:50 -0800 Subject: [PATCH 5/6] Update test and fix goroutine error --- pkg/workceptor/kubernetes.go | 10 +++++-- pkg/workceptor/kubernetes_test.go | 46 +++++++++++++------------------ 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 3a3f91bda..0b0ccdf3b 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -171,7 +171,7 @@ func (ku KubeAPIWrapper) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter { // It is instantiated in the NewkubeWorker function and available throughout the package. var KubeAPIWrapperInstance KubeAPIer -var KubeAPIWrapperLock *sync.RWMutex +var KubeAPIWrapperLock sync.Mutex // ErrPodCompleted is returned when pod has already completed before we could attach. var ErrPodCompleted = fmt.Errorf("pod ran to completion") @@ -522,7 +522,9 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error { }) // Wait for the pod to be running + KubeAPIWrapperLock.Lock() fieldSelector := KubeAPIWrapperInstance.OneTermEqualSelector("metadata.name", kw.pod.Name).String() + KubeAPIWrapperLock.Unlock() lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector @@ -544,7 +546,9 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error { } time.Sleep(2 * time.Second) + KubeAPIWrapperLock.Lock() ev, err := KubeAPIWrapperInstance.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady()) + KubeAPIWrapperLock.Unlock() if ev == nil || ev.Object == nil { return fmt.Errorf("did not return an event while watching pod for work unit %s", kw.ID()) } @@ -1574,11 +1578,11 @@ func (cfg KubeWorkerCfg) NewkubeWorker(bwu BaseWorkUnitForWorkUnit, w *Workcepto } } - KubeAPIWrapperLock = &sync.RWMutex{} KubeAPIWrapperLock.Lock() - KubeAPIWrapperInstance = KubeAPIWrapper{} if kawi != nil { KubeAPIWrapperInstance = kawi + } else { + KubeAPIWrapperInstance = KubeAPIWrapper{} } KubeAPIWrapperLock.Unlock() diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index 9c120df33..0cd25bdbb 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -144,7 +144,7 @@ func TestParseTime(t *testing.T) { } } -func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, context.Context) { +func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, *gomock.Controller, context.Context) { ctrl := gomock.NewController(t) ctx := context.Background() @@ -162,7 +162,7 @@ func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workcep kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"} ku := kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI) - return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctx + return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctrl, ctx } type hasTerm struct { @@ -194,7 +194,7 @@ func (e *ex) StreamWithContext(_ context.Context, _ remotecommand.StreamOptions) } func TestKubeStart(t *testing.T) { - ku, mockbwu, mockNet, w, mockKubeAPI, ctx := createKubernetesTestSetup(t) + ku, mockbwu, mockNet, w, mockKubeAPI, _, ctx := createKubernetesTestSetup(t) startTestCases := []struct { name string @@ -432,27 +432,11 @@ func Test_IsCompatibleK8S(t *testing.T) { func TestKubeLoggingWithReconnect(t *testing.T) { var stdinErr error var stdoutErr error - ctx := context.Background() - ctrl := gomock.NewController(t) - mockBaseWorkUnitForWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl) - mockBaseWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl) - mockNetceptor := mock_workceptor.NewMockNetceptorForWorkceptor(ctrl) - mockNetceptor.EXPECT().NodeID().Return("NodeID") - mockKubeAPI := mock_workceptor.NewMockKubeAPIer(ctrl) + ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctrl, ctx := createKubernetesTestSetup(t) kw := &workceptor.KubeUnit{ - BaseWorkUnitForWorkUnit: mockBaseWorkUnitForWorkUnit, + BaseWorkUnitForWorkUnit: mockBaseWorkUnit, } - - w, err := workceptor.New(ctx, mockNetceptor, "/tmp") - if err != nil { - t.Errorf("Error while creating Workceptor: %v", err) - } - - mockBaseWorkUnit.EXPECT().Init(w, "", "", workceptor.FileSystem{}, nil) - kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"} - kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI) - tests := []struct { name string expectedCalls func() @@ -460,16 +444,24 @@ func TestKubeLoggingWithReconnect(t *testing.T) { { name: "Kube error should be read", expectedCalls: func() { + mockBaseWorkUnit.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + config := rest.Config{} + mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil) + mockBaseWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes() + clientset := kubernetes.Clientset{} + mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil) lock := &sync.RWMutex{} - mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes() + mockBaseWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes() + mockBaseWorkUnit.EXPECT().MonitorLocalStatus().AnyTimes() + mockBaseWorkUnit.EXPECT().UnitDir().Return("TestDir2").AnyTimes() kubeExtraData := workceptor.KubeExtraData{} status := workceptor.StatusFileData{ExtraData: &kubeExtraData} - mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes() - mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes() - mockBaseWorkUnitForWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes() + mockBaseWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes() + mockBaseWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes() + mockBaseWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes() pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test_Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}} mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() - mockBaseWorkUnitForWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes() + mockBaseWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes() field := hasTerm{} mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes() ev := watch.Event{Object: &pod} @@ -487,7 +479,6 @@ func TestKubeLoggingWithReconnect(t *testing.T) { NegotiatedSerializer: scheme.Codecs.WithoutConversion(), } mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() - mockBaseWorkUnitForWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes() logger := logger.NewReceptorLogger("") mockNetceptor.EXPECT().GetLogger().Return(logger).AnyTimes() mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() @@ -499,6 +490,7 @@ func TestKubeLoggingWithReconnect(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.expectedCalls() + ku.Start() kw.CreatePod(nil) wg := &sync.WaitGroup{} wg.Add(1) From 6722915920076e22af7a61785e31a66be3fd3cf2 Mon Sep 17 00:00:00 2001 From: matoval Date: Mon, 9 Dec 2024 13:02:05 -0800 Subject: [PATCH 6/6] fix lint --- pkg/workceptor/kubernetes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 0b0ccdf3b..310af3492 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -1582,7 +1582,7 @@ func (cfg KubeWorkerCfg) NewkubeWorker(bwu BaseWorkUnitForWorkUnit, w *Workcepto if kawi != nil { KubeAPIWrapperInstance = kawi } else { - KubeAPIWrapperInstance = KubeAPIWrapper{} + KubeAPIWrapperInstance = KubeAPIWrapper{} } KubeAPIWrapperLock.Unlock()