Skip to content

Commit

Permalink
[YUNIKORN-2029] remove deprecated function
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <payang@apache.org>
  • Loading branch information
FrankYang0529 committed Oct 12, 2023
1 parent 0253b4a commit ea4334f
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 38 deletions.
11 changes: 4 additions & 7 deletions test/e2e/framework/helpers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
"time"

"github.com/google/uuid"
"github.com/onsi/ginkgo/v2"
Expand All @@ -56,8 +54,8 @@ func GetAbsPath(p string) (string, error) {
// GetTestName returns the test Name in a single string without spaces or /
func GetTestName() string {
//nolint
testDesc := ginkgo.CurrentGinkgoTestDescription()
name := strings.Replace(testDesc.FullTestText, " ", "_", -1)
testReport := ginkgo.CurrentSpecReport()
name := strings.Replace(testReport.FullText(), " ", "_", -1)

Check failure on line 58 in test/e2e/framework/helpers/common/utils.go

View workflow job for this annotation

GitHub Actions / build

wrapperFunc: use strings.ReplaceAll method in `strings.Replace(testReport.FullText(), " ", "_", -1)` (gocritic)
name = strings.Trim(name, "*")
return strings.Replace(name, "/", "-", -1)
}
Expand Down Expand Up @@ -101,12 +99,12 @@ func CreateLogFile(filename string, data []byte) error {
}

finalPath := filepath.Join(path, filename)
err = ioutil.WriteFile(finalPath, data, configmanager.LogPerm)
err = os.WriteFile(finalPath, data, configmanager.LogPerm)
return err
}

func GetFileContents(filename string) ([]byte, error) {
data, err := ioutil.ReadFile(filename)
data, err := os.ReadFile(filename)
return data, err
}

Expand All @@ -116,7 +114,6 @@ func GetUUID() string {

func RandSeq(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyz0123456789")
rand.Seed(time.Now().UnixNano())
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/helpers/k8s/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func ObserveEventAfterAction(c clientset.Interface, ns string, eventPredicate fu
// Wait up 2 minutes polling every second.
timeout := 2 * time.Minute
interval := 1 * time.Second
err = wait.Poll(interval, timeout, func() (bool, error) {
err = wait.PollUntilContextTimeout(context.TODO(), interval, timeout, false, func(context.Context) (bool, error) {
return observedMatchingEvent, nil
})
return err == nil, err
Expand Down
38 changes: 19 additions & 19 deletions test/e2e/framework/helpers/k8s/k8s_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (k *KubeCtl) UpdateNamespace(namespace string, annotations map[string]strin
}

func (k *KubeCtl) WaitForServiceAccountPresent(namespace string, svcAcctName string, timeout time.Duration) error {
return wait.PollImmediate(time.Second, timeout, k.isServiceAccountPresent(namespace, svcAcctName))
return wait.PollUntilContextTimeout(context.TODO(), time.Second, timeout, false, k.isServiceAccountPresent(namespace, svcAcctName).WithContext())
}

func (k *KubeCtl) isServiceAccountPresent(namespace string, svcAcctName string) wait.ConditionFunc {
Expand Down Expand Up @@ -720,47 +720,47 @@ func (k *KubeCtl) isNumPod(namespace string, wanted int) wait.ConditionFunc {
}

func (k *KubeCtl) WaitForJobPods(namespace string, jobName string, numPods int, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodRunning))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodRunning).WithContext())
}
func (k *KubeCtl) WaitForPodEvent(namespace string, podName string, expectedReason string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodEventTriggered(namespace, podName, expectedReason))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodEventTriggered(namespace, podName, expectedReason).WithContext())
}

func (k *KubeCtl) WaitForPodTerminated(namespace string, podName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodNotInNS(podName, namespace))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodNotInNS(podName, namespace).WithContext())
}

func (k *KubeCtl) WaitForJobTerminated(namespace string, jobName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isJobNotInNS(jobName, namespace))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isJobNotInNS(jobName, namespace).WithContext())
}

// Poll up to timeout seconds for pod to enter running state.
// Returns an error if the pod never enters the running state.
func (k *KubeCtl) WaitForPodRunning(namespace string, podName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodInDesiredState(podName, namespace, v1.PodRunning))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodInDesiredState(podName, namespace, v1.PodRunning).WithContext())
}

func (k *KubeCtl) WaitForPodPending(namespace string, podName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodInDesiredState(podName, namespace, v1.PodPending))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodInDesiredState(podName, namespace, v1.PodPending).WithContext())
}

func (k *KubeCtl) WaitForPodSucceeded(namespace string, podName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodInDesiredState(podName, namespace, v1.PodSucceeded))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodInDesiredState(podName, namespace, v1.PodSucceeded).WithContext())
}

func (k *KubeCtl) WaitForPodFailed(namespace string, podName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodInDesiredState(podName, namespace, v1.PodFailed))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodInDesiredState(podName, namespace, v1.PodFailed).WithContext())
}

func (k *KubeCtl) WaitForPodCount(namespace string, wanted int, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isNumPod(namespace, wanted))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isNumPod(namespace, wanted).WithContext())
}

func (k *KubeCtl) WaitForPodStateStable(namespace string, podName string, timeout time.Duration) (error, v1.PodPhase) {
var lastPhase v1.PodPhase
samePhases := 0

err := wait.PollImmediate(time.Second, timeout, k.isPodStable(namespace, podName, &samePhases, 3, &lastPhase))
err := wait.PollUntilContextTimeout(context.TODO(), time.Second, timeout, false, k.isPodStable(namespace, podName, &samePhases, 3, &lastPhase).WithContext())
return err, lastPhase
}

Expand Down Expand Up @@ -818,7 +818,7 @@ func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector string,

// Wait up to timeout seconds for a pod in 'namespace' with given 'selector' to exist
func (k *KubeCtl) WaitForPodBySelector(namespace string, selector string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isPodSelectorInNs(selector, namespace))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodSelectorInNs(selector, namespace).WithContext())
}

func (k *KubeCtl) CreateSecret(secret *v1.Secret, namespace string) (*v1.Secret, error) {
Expand Down Expand Up @@ -920,7 +920,7 @@ func (k *KubeCtl) PodScheduled(podNamespace, podName string) wait.ConditionFunc
}

func (k *KubeCtl) WaitForPodScheduled(namespace string, podName string, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.PodScheduled(namespace, podName))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.PodScheduled(namespace, podName).WithContext())
}

// PodUnschedulable returns a condition function that returns true if the given pod
Expand All @@ -941,7 +941,7 @@ func (k *KubeCtl) PodUnschedulable(podNamespace, podName string) wait.ConditionF
// WaitForPodUnschedulable waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the given timeout.
func (k *KubeCtl) WaitForPodUnschedulable(pod *v1.Pod, timeout time.Duration) error {
return wait.PollImmediate(100*time.Millisecond, timeout, k.PodUnschedulable(pod.Namespace, pod.Name))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.PodUnschedulable(pod.Namespace, pod.Name).WithContext())
}

func (k *KubeCtl) CreatePriorityClass(pc *schedulingv1.PriorityClass) (*schedulingv1.PriorityClass, error) {
Expand All @@ -957,19 +957,19 @@ func (k *KubeCtl) CreateJob(job *batchv1.Job, namespace string) (*batchv1.Job, e
}

func (k *KubeCtl) WaitForJobPodsCreated(namespace string, jobName string, numPods int, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isNumJobPodsCreated(jobName, namespace, numPods))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isNumJobPodsCreated(jobName, namespace, numPods).WithContext())
}

func (k *KubeCtl) WaitForJobPodsRunning(namespace string, jobName string, numPods int, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodRunning))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodRunning).WithContext())
}

func (k *KubeCtl) WaitForJobPodsSucceeded(namespace string, jobName string, numPods int, timeout time.Duration) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodSucceeded))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodSucceeded).WithContext())
}

func (k *KubeCtl) WaitForPlaceholders(namespace string, podPrefix string, numPods int, timeout time.Duration, podPhase *v1.PodPhase) error {
return wait.PollImmediate(time.Millisecond*100, timeout, k.isNumPlaceholdersRunning(namespace, podPrefix, numPods, podPhase))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isNumPlaceholdersRunning(namespace, podPrefix, numPods, podPhase).WithContext())
}

func (k *KubeCtl) ListPlaceholders(namespace string, podPrefix string) ([]v1.Pod, error) {
Expand All @@ -991,7 +991,7 @@ func (k *KubeCtl) ListPlaceholders(namespace string, podPrefix string) ([]v1.Pod
func (k *KubeCtl) WaitForPlaceholdersStableState(namespace string, podPrefix string, timeout time.Duration) error {
samePhases := 0
podPhases := make(map[string]v1.PodPhase)
return wait.PollImmediate(time.Second, timeout, k.arePlaceholdersStable(namespace, podPrefix, &samePhases, 3, podPhases))
return wait.PollUntilContextTimeout(context.TODO(), time.Second, timeout, false, k.arePlaceholdersStable(namespace, podPrefix, &samePhases, 3, podPhases).WithContext())
}

func (k *KubeCtl) isNumPlaceholdersRunning(namespace string, podPrefix string, num int, podPhase *v1.PodPhase) wait.ConditionFunc {
Expand Down
14 changes: 6 additions & 8 deletions test/e2e/framework/helpers/yunikorn/rest_api_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package yunikorn

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -107,10 +108,7 @@ func (c *RClient) GetHealthCheck() (dao.SchedulerHealthDAOInfo, error) {
}

func (c *RClient) WaitforQueueToAppear(partition string, queueName string, timeout int) error {
if err := wait.PollImmediate(300*time.Millisecond, time.Duration(timeout)*time.Second, c.IsQueuePresent(partition, queueName)); err != nil {
return err
}
return nil
return wait.PollUntilContextTimeout(context.TODO(), 300*time.Microsecond, time.Duration(timeout)*time.Second, false, c.IsQueuePresent(partition, queueName).WithContext())
}

func (c *RClient) IsQueuePresent(partition string, queueName string) wait.ConditionFunc {
Expand Down Expand Up @@ -198,7 +196,7 @@ func (c *RClient) isAllocLogPresent(partition string, queueName string, appID st
}

func (c *RClient) WaitForAllocationLog(partition string, queueName string, appID string, podName string, timeout int) error {
if err := wait.PollImmediate(time.Second, time.Duration(timeout)*time.Second, c.isAllocLogPresent(partition, queueName, appID, podName)); err != nil {
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Duration(timeout)*time.Second, false, c.isAllocLogPresent(partition, queueName, appID, podName).WithContext()); err != nil {
return err
}

Expand Down Expand Up @@ -234,7 +232,7 @@ func (c *RClient) GetNodes(partition string) (*[]dao.NodeDAOInfo, error) {
}

func (c *RClient) WaitForAppStateTransition(partition string, queue string, appID string, state string, timeout int) error {
return wait.PollImmediate(time.Millisecond*300, time.Duration(timeout)*time.Second, c.isAppInDesiredState(partition, queue, appID, state))
return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*300, time.Duration(timeout)*time.Second, false, c.isAppInDesiredState(partition, queue, appID, state).WithContext())
}

func (c *RClient) AreAllExecPodsAllotted(partition string, queueName string, appID string, execPodCount int) wait.ConditionFunc {
Expand Down Expand Up @@ -285,7 +283,7 @@ func isRootSched(policy string) wait.ConditionFunc {
}

func WaitForSchedPolicy(policy string, timeout time.Duration) error {
return wait.PollImmediate(2*time.Second, timeout, isRootSched(policy))
return wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, timeout, false, isRootSched(policy).WithContext())
}

func GetFailedHealthChecks() (string, error) {
Expand Down Expand Up @@ -339,7 +337,7 @@ func compareQueueTS(queuePathStr string, ts string) wait.ConditionFunc {

// Expects queuePath to use periods as delimiters. ie "root.queueA.child"
func WaitForQueueTS(queuePathStr string, ts string, timeout time.Duration) error {
return wait.PollImmediate(2*time.Second, timeout, compareQueueTS(queuePathStr, ts))
return wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, timeout, false, compareQueueTS(queuePathStr, ts).WithContext())
}

func AllocLogToStrings(log []*dao.AllocationAskLogDAOInfo) []string {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/resource_fairness/resource_fairness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package resourcefairness_test

import (
"context"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -153,7 +154,7 @@ var _ = Describe("FairScheduling:", func() {
Ω(err).NotTo(HaveOccurred())

// Wait till requests has been added to application
err := wait.PollImmediate(300*time.Millisecond, 60*time.Second, func() (bool, error) {
err := wait.PollUntilContextTimeout(context.TODO(), 300*time.Millisecond, 60*time.Second, false, func(context.Context) (bool, error) {
app, err := restClient.GetAppInfo("default", queuePath, appID)
if err != nil {
return false, nil
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package spark_jobs_scheduling

import (
"context"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -117,7 +118,7 @@ var _ = Describe("", func() {
By(fmt.Sprintf("Get apps from specific queue: %s", sparkNS))
var appsFromQueue []*dao.ApplicationDAOInfo
// Poll for apps to appear in the queue
err = wait.PollImmediate(time.Millisecond*100, time.Duration(120)*time.Second, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, time.Duration(120)*time.Second, false, func(context.Context) (done bool, err error) {
appsFromQueue, err = restClient.GetApps(configmanager.DefaultPartition, configmanager.RootQueue+"."+sparkNS)
if len(appsFromQueue) == 3 {
return true, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package stateawareappscheduling_test

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -72,7 +73,7 @@ var _ = Describe("DripFeedSchedule:", func() {
By(fmt.Sprintf("Get apps from specific queue: %s", ns))
var appsFromQueue []*dao.ApplicationDAOInfo
// Poll for apps to appear in the queue
err = wait.PollImmediate(time.Second, time.Duration(60)*time.Second, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Duration(60)*time.Second, false, func(context.Context) (done bool, err error) {
appsFromQueue, err = restClient.GetApps("default", "root."+ns)
if len(appsFromQueue) == 3 {
return true, nil
Expand Down

0 comments on commit ea4334f

Please sign in to comment.