Skip to content

Commit

Permalink
Fix linting errors and add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronH88 committed Feb 14, 2024
1 parent 43e9fb0 commit e4f638d
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 139 deletions.
67 changes: 34 additions & 33 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,28 @@ type KubeExtraData struct {
}

type KubeAPIer interface {
NewNotFound(qualifiedResource schema.GroupResource, name string) *apierrors.StatusError
OneTermEqualSelector(k string, v string) fields.Selector
NewForConfig(c *rest.Config) (*kubernetes.Clientset, error)
GetLogs(clientset *kubernetes.Clientset, namespace string, name string, opts *corev1.PodLogOptions) *rest.Request
Get(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.GetOptions) (*corev1.Pod, error)
Create(clientset *kubernetes.Clientset, namespace string, ctx context.Context, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error)
List(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error)
Watch(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Delete(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.DeleteOptions) error
SubResource(clientset *kubernetes.Clientset, podName string, podNamespace string) *rest.Request
NewNotFound(schema.GroupResource, string) *apierrors.StatusError
OneTermEqualSelector(string, string) fields.Selector
NewForConfig(*rest.Config) (*kubernetes.Clientset, error)
GetLogs(*kubernetes.Clientset, string, string, *corev1.PodLogOptions) *rest.Request
Get(context.Context, *kubernetes.Clientset, string, string, metav1.GetOptions) (*corev1.Pod, error)
Create(context.Context, *kubernetes.Clientset, string, *corev1.Pod, metav1.CreateOptions) (*corev1.Pod, error)
List(context.Context, *kubernetes.Clientset, string, metav1.ListOptions) (*corev1.PodList, error)
Watch(context.Context, *kubernetes.Clientset, string, metav1.ListOptions) (watch.Interface, error)
Delete(context.Context, *kubernetes.Clientset, string, string, metav1.DeleteOptions) error
SubResource(*kubernetes.Clientset, string, string) *rest.Request
InClusterConfig() (*rest.Config, error)
NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules
BuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*rest.Config, error)
NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error)
NewSPDYExecutor(config *rest.Config, method string, url *url.URL) (remotecommand.Executor, error)
StreamWithContext(exec remotecommand.Executor, ctx context.Context, options remotecommand.StreamOptions) error
UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition watch2.PreconditionFunc, conditions ...watch2.ConditionFunc) (*watch.Event, error)
BuildConfigFromFlags(string, string) (*rest.Config, error)
NewClientConfigFromBytes([]byte) (clientcmd.ClientConfig, error)
NewSPDYExecutor(*rest.Config, string, *url.URL) (remotecommand.Executor, error)
StreamWithContext(context.Context, remotecommand.Executor, remotecommand.StreamOptions) error
UntilWithSync(context.Context, cache.ListerWatcher, runtime.Object, watch2.PreconditionFunc, ...watch2.ConditionFunc) (*watch.Event, error)
NewFakeNeverRateLimiter() flowcontrol.RateLimiter
NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter
}

type KubeAPIWrapper struct {
}
type KubeAPIWrapper struct{}

func (ku KubeAPIWrapper) NewNotFound(qualifiedResource schema.GroupResource, name string) *apierrors.StatusError {
return apierrors.NewNotFound(qualifiedResource, name)
Expand All @@ -106,23 +105,23 @@ func (ku KubeAPIWrapper) GetLogs(clientset *kubernetes.Clientset, namespace stri
return clientset.CoreV1().Pods(namespace).GetLogs(name, opts)
}

func (ku KubeAPIWrapper) Get(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.GetOptions) (*corev1.Pod, error) {
func (ku KubeAPIWrapper) Get(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, opts metav1.GetOptions) (*corev1.Pod, error) {
return clientset.CoreV1().Pods(namespace).Get(ctx, name, opts)
}

func (ku KubeAPIWrapper) Create(clientset *kubernetes.Clientset, namespace string, ctx context.Context, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) {
func (ku KubeAPIWrapper) Create(ctx context.Context, clientset *kubernetes.Clientset, namespace string, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) {
return clientset.CoreV1().Pods(namespace).Create(ctx, pod, opts)
}

func (ku KubeAPIWrapper) List(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error) {
func (ku KubeAPIWrapper) List(ctx context.Context, clientset *kubernetes.Clientset, namespace string, opts metav1.ListOptions) (*corev1.PodList, error) {
return clientset.CoreV1().Pods(namespace).List(ctx, opts)
}

func (ku KubeAPIWrapper) Watch(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
func (ku KubeAPIWrapper) Watch(ctx context.Context, clientset *kubernetes.Clientset, namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Pods(namespace).Watch(ctx, opts)
}

func (ku KubeAPIWrapper) Delete(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.DeleteOptions) error {
func (ku KubeAPIWrapper) Delete(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, opts metav1.DeleteOptions) error {
return clientset.CoreV1().Pods(namespace).Delete(ctx, name, opts)
}

Expand All @@ -138,8 +137,8 @@ func (ku KubeAPIWrapper) NewDefaultClientConfigLoadingRules() *clientcmd.ClientC
return clientcmd.NewDefaultClientConfigLoadingRules()
}

func (ku KubeAPIWrapper) BuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*rest.Config, error) {
return clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
func (ku KubeAPIWrapper) BuildConfigFromFlags(masterURL string, kubeconfigPath string) (*rest.Config, error) {
return clientcmd.BuildConfigFromFlags(masterURL, kubeconfigPath)
}

func (ku KubeAPIWrapper) NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error) {
Expand All @@ -150,7 +149,7 @@ func (ku KubeAPIWrapper) NewSPDYExecutor(config *rest.Config, method string, url
return remotecommand.NewSPDYExecutor(config, method, url)
}

func (ku KubeAPIWrapper) StreamWithContext(exec remotecommand.Executor, ctx context.Context, options remotecommand.StreamOptions) error {
func (ku KubeAPIWrapper) StreamWithContext(ctx context.Context, exec remotecommand.Executor, options remotecommand.StreamOptions) error {
return exec.StreamWithContext(ctx, options)
}

Expand All @@ -166,6 +165,8 @@ func (ku KubeAPIWrapper) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter {
return flowcontrol.NewFakeAlwaysRateLimiter()
}

// KubeAPIWrapperInstance is a package level var that wraps all required kubernetes API calls.
// It is instantiated in the NewkubeWorker function and available throughout the package.
var KubeAPIWrapperInstance KubeAPIer

// ErrPodCompleted is returned when pod has already completed before we could attach.
Expand Down Expand Up @@ -309,7 +310,7 @@ func (kw *kubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout

// get pod, with retry
for retries := 5; retries > 0; retries-- {
kw.pod, err = KubeAPIWrapperInstance.Get(kw.clientset, podNamespace, kw.GetContext(), podName, metav1.GetOptions{})
kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{})
if err == nil {
break
}
Expand Down Expand Up @@ -485,7 +486,7 @@ func (kw *kubeUnit) createPod(env map[string]string) error {
}

// get pod and store to kw.pod
kw.pod, err = KubeAPIWrapperInstance.Create(kw.clientset, ked.KubeNamespace, kw.GetContext(), pod, metav1.CreateOptions{})
kw.pod, err = KubeAPIWrapperInstance.Create(kw.GetContext(), kw.clientset, ked.KubeNamespace, pod, metav1.CreateOptions{})
if err != nil {
return err
}
Expand All @@ -509,12 +510,12 @@ func (kw *kubeUnit) createPod(env map[string]string) error {
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector

return KubeAPIWrapperInstance.List(kw.clientset, ked.KubeNamespace, kw.GetContext(), options)
return KubeAPIWrapperInstance.List(kw.GetContext(), kw.clientset, ked.KubeNamespace, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector

return KubeAPIWrapperInstance.Watch(kw.clientset, ked.KubeNamespace, kw.GetContext(), options)
return KubeAPIWrapperInstance.Watch(kw.GetContext(), kw.clientset, ked.KubeNamespace, options)
},
}

Expand Down Expand Up @@ -640,7 +641,7 @@ func (kw *kubeUnit) runWorkUsingLogger() {
default:
}

kw.pod, err = KubeAPIWrapperInstance.Get(kw.clientset, podNamespace, kw.GetContext(), podName, metav1.GetOptions{})
kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{})
if err == nil {
break
}
Expand Down Expand Up @@ -772,7 +773,7 @@ func (kw *kubeUnit) runWorkUsingLogger() {

var err error
for retries := 5; retries > 0; retries-- {
err = KubeAPIWrapperInstance.StreamWithContext(exec, kw.GetContext(), remotecommand.StreamOptions{
err = KubeAPIWrapperInstance.StreamWithContext(kw.GetContext(), exec, remotecommand.StreamOptions{
Stdin: stdin,
Tty: false,
})
Expand Down Expand Up @@ -1357,7 +1358,7 @@ func (kw *kubeUnit) Restart() error {
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Warning("Pod %s could not be deleted: %s", ked.PodName, err.Error())
} else {
err := KubeAPIWrapperInstance.Delete(kw.clientset, ked.KubeNamespace, context.Background(), ked.PodName, metav1.DeleteOptions{})
err := KubeAPIWrapperInstance.Delete(kw.GetContext(), kw.clientset, ked.KubeNamespace, ked.PodName, metav1.DeleteOptions{})
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Warning("Pod %s could not be deleted: %s", ked.PodName, err.Error())
}
Expand All @@ -1382,7 +1383,7 @@ func (kw *kubeUnit) Cancel() error {
kw.CancelContext()
kw.UpdateBasicStatus(WorkStateCanceled, "Canceled", -1)
if kw.pod != nil {
err := KubeAPIWrapperInstance.Delete(kw.clientset, kw.pod.Namespace, context.Background(), kw.pod.Name, metav1.DeleteOptions{})
err := KubeAPIWrapperInstance.Delete(kw.GetContext(), kw.clientset, kw.pod.Namespace, kw.pod.Name, metav1.DeleteOptions{})
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error("Error deleting pod %s: %s", kw.pod.Name, err)
}
Expand Down
95 changes: 50 additions & 45 deletions pkg/workceptor/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,75 +138,80 @@ type hasTerm struct {

func (h *hasTerm) DeepCopySelector() fields.Selector { return h }
func (h *hasTerm) Empty() bool { return true }
func (h *hasTerm) Matches(ls fields.Fields) bool { return true }
func (h *hasTerm) Matches(_ fields.Fields) bool { return true }
func (h *hasTerm) Requirements() fields.Requirements {
return []fields.Requirement{{
Field: h.field,
Operator: selection.Equals,
Value: h.value,
}}
}
func (h *hasTerm) RequiresExactMatch(field string) (value string, found bool) { return "", true }
func (h *hasTerm) String() string { return "Test" }
func (h *hasTerm) Transform(fn fields.TransformFunc) (fields.Selector, error) { return h, nil }
func (h *hasTerm) RequiresExactMatch(_ string) (value string, found bool) { return "", true }
func (h *hasTerm) String() string { return "Test" }
func (h *hasTerm) Transform(_ fields.TransformFunc) (fields.Selector, error) { return h, nil }

type ex struct {
}
type ex struct{}

func (e *ex) Stream(options remotecommand.StreamOptions) error {
func (e *ex) Stream(_ remotecommand.StreamOptions) error {
return nil
}

func (e *ex) StreamWithContext(ctx context.Context, options remotecommand.StreamOptions) error {
func (e *ex) StreamWithContext(_ context.Context, _ remotecommand.StreamOptions) error {
return nil
}

func TestKubeStart(t *testing.T) {
ku, mockbwu, mockNet, w, mockKubeAPI, ctx := createKubernetesTestSetup(t)

startTestCases := []struct {
name string
name string
expectedCalls func()
}{
{name: "test1"},
{
name: "test1",
expectedCalls: func() {
mockbwu.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
config := rest.Config{}
mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil)
mockbwu.EXPECT().GetWorkceptor().Return(w).AnyTimes()
logger := logger.NewReceptorLogger("")
mockNet.EXPECT().GetLogger().Return(logger).AnyTimes()
clientset := kubernetes.Clientset{}
mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil)
mockbwu.EXPECT().MonitorLocalStatus().AnyTimes()
lock := &sync.RWMutex{}
mockbwu.EXPECT().GetStatusLock().Return(lock).AnyTimes()
kubeExtraData := workceptor.KubeExtraData{}
status := workceptor.StatusFileData{ExtraData: &kubeExtraData}
mockbwu.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes()
mockbwu.EXPECT().GetStatusCopy().Return(status).AnyTimes()
mockbwu.EXPECT().GetContext().Return(ctx).AnyTimes()
pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}}

mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
mockbwu.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes()

field := hasTerm{}
mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes()
ev := watch.Event{Object: &pod}
mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes()
apierr := apierrors.StatusError{}
mockKubeAPI.EXPECT().NewNotFound(gomock.Any(), gomock.Any()).Return(&apierr).AnyTimes()
mockbwu.EXPECT().MonitorLocalStatus().AnyTimes()

c := rest.RESTClient{}
req := rest.NewRequest(&c)
mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req).AnyTimes()
exec := ex{}
mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes()
mockbwu.EXPECT().UnitDir().Return("TestDir").AnyTimes()
},
},
}

for _, testCase := range startTestCases {
t.Run(testCase.name, func(t *testing.T) {
mockbwu.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
config := rest.Config{}
mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil)
mockbwu.EXPECT().GetWorkceptor().Return(w).AnyTimes()
logger := logger.NewReceptorLogger("")
mockNet.EXPECT().GetLogger().Return(logger).AnyTimes()
clientset := kubernetes.Clientset{}
mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil)
mockbwu.EXPECT().MonitorLocalStatus().AnyTimes()
lock := &sync.RWMutex{}
mockbwu.EXPECT().GetStatusLock().Return(lock).AnyTimes()
kubeExtraData := workceptor.KubeExtraData{}
status := workceptor.StatusFileData{ExtraData: &kubeExtraData}
mockbwu.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes()
mockbwu.EXPECT().GetStatusCopy().Return(status).AnyTimes()
mockbwu.EXPECT().GetContext().Return(ctx).AnyTimes()
pod := corev1.Pod{metav1.TypeMeta{}, metav1.ObjectMeta{Name: "Test Name"}, corev1.PodSpec{}, corev1.PodStatus{}}

mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
mockbwu.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes()

field := hasTerm{}
mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes()
ev := watch.Event{Object: &pod}
mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes()
apierr := apierrors.StatusError{}
mockKubeAPI.EXPECT().NewNotFound(gomock.Any(), gomock.Any()).Return(&apierr).AnyTimes()
mockbwu.EXPECT().MonitorLocalStatus().AnyTimes()

c := rest.RESTClient{}
req := rest.NewRequest(&c)
mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req).AnyTimes()
exec := ex{}
mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes()
mockbwu.EXPECT().UnitDir().Return("TestDir").AnyTimes()
testCase.expectedCalls()

err := ku.Start()
if err != nil {
Expand Down
Loading

0 comments on commit e4f638d

Please sign in to comment.