Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Removed namespaceAsQueue.
Browse files Browse the repository at this point in the history
Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Mar 3, 2019
1 parent ff790cd commit f19d32f
Show file tree
Hide file tree
Showing 23 changed files with 222 additions and 385 deletions.
16 changes: 1 addition & 15 deletions cmd/kube-batch/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,12 @@ type ServerOption struct {
SchedulerName string
SchedulerConf string
SchedulePeriod string
NamespaceAsQueue bool
EnableLeaderElection bool
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
}

var (
opts *ServerOption
)

func Options() *ServerOption {
if opts == nil {
opts = &ServerOption{}
}
return opts
}

// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}
Expand All @@ -62,12 +50,10 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.SchedulerName, "scheduler-name", "kube-batch", "kube-batch will handle pods with the scheduler-name")
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
fs.StringVar(&s.SchedulePeriod, "schedule-period", "1s", "The period between each scheduling cycle")
fs.StringVar(&s.DefaultQueue, "default-queue", "", "The name of the queue to fall-back to instead of namespace name")
fs.StringVar(&s.DefaultQueue, "default-queue", "default", "The default queue name of the job")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection,
"Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated kube-batch for high availability")
fs.BoolVar(&s.NamespaceAsQueue, "enable-namespace-as-queue", true, "Make Namespace as Queue with weight one, "+
"but kube-batch will not handle Queue CRD anymore")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object")
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/kube-batch/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func Run(opt *options.ServerOption) error {
}

// Start policy controller to allocate resources.
sched, err := scheduler.NewScheduler(config, opt.SchedulerName,
opt.SchedulerConf, opt.SchedulePeriod, opt.NamespaceAsQueue)
sched, err := scheduler.NewScheduler(config,
opt.SchedulerName,
opt.SchedulerConf,
opt.SchedulePeriod,
opt.DefaultQueue)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")

func main() {
s := options.Options()
s := options.NewServerOption()
s.AddFlags(pflag.CommandLine)

flag.InitFlags()
Expand Down
11 changes: 11 additions & 0 deletions config/kube-batch-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
actions: "reclaim, allocate, backfill, preempt"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: prioritize
6 changes: 6 additions & 0 deletions config/queue/default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: scheduling.incubator.k8s.io/v1alpha1
kind: Queue
metadata:
name: default
spec:
weight: 1
1 change: 1 addition & 0 deletions example/kube-batch-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
Expand Down
16 changes: 4 additions & 12 deletions hack/run-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,22 @@
export PATH="${HOME}/.kubeadm-dind-cluster:${PATH}"
export KA_BIN=_output/bin
export LOG_LEVEL=3
export NUM_NODES=4
export NUM_NODES=3

dind_url=https://cdn.rawgit.com/kubernetes-sigs/kubeadm-dind-cluster/master/fixed/dind-cluster-v1.13.sh
dind_dest=./hack/dind-cluster-v1.13.sh

if [ $(echo $RANDOM%2 | bc) -eq 1 ]
then
enable_namespace_as_queue=true
else
enable_namespace_as_queue=false
fi

export ENABLE_NAMESPACES_AS_QUEUE=$enable_namespace_as_queue

# start k8s dind cluster
curl ${dind_url} --output ${dind_dest}
chmod +x ${dind_dest}
${dind_dest} up

kubectl create -f config/crds/scheduling_v1alpha1_podgroup.yaml
kubectl create -f config/crds/scheduling_v1alpha1_queue.yaml
kubectl create -f config/queue/default.yaml

# start kube-batch
nohup ${KA_BIN}/kube-batch --kubeconfig ${HOME}/.kube/config --scheduler-conf=example/kube-batch-conf.yaml --enable-namespace-as-queue=${ENABLE_NAMESPACES_AS_QUEUE} --logtostderr --v ${LOG_LEVEL} > scheduler.log 2>&1 &
nohup ${KA_BIN}/kube-batch --kubeconfig ${HOME}/.kube/config --scheduler-conf=config/kube-batch-conf.yaml --logtostderr --v ${LOG_LEVEL} > scheduler.log 2>&1 &

# clean up
function cleanup {
Expand All @@ -43,4 +35,4 @@ function cleanup {
trap cleanup EXIT

# Run e2e test
go test ./test/e2e -v
go test ./test/e2e -v -timeout 30m
9 changes: 9 additions & 0 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func TestAllocate(t *testing.T) {
Name: "pg1",
Namespace: "c1",
},
Spec: kbv1.PodGroupSpec{
Queue: "c1",
},
},
},
pods: []*v1.Pod{
Expand Down Expand Up @@ -190,12 +193,18 @@ func TestAllocate(t *testing.T) {
Name: "pg1",
Namespace: "c1",
},
Spec: kbv1.PodGroupSpec{
Queue: "c1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg2",
Namespace: "c2",
},
Spec: kbv1.PodGroupSpec{
Queue: "c2",
},
},
},

Expand Down
21 changes: 12 additions & 9 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ func preempt(
nodes map[string]*api.NodeInfo,
filter func(*api.TaskInfo) bool,
) (bool, error) {
resreq := preemptor.Resreq.Clone()
preempted := api.EmptyResource()
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}
assigned := false
Expand Down Expand Up @@ -203,6 +201,9 @@ func preempt(
preemptor.Namespace, preemptor.Name, node.Name)

var preemptees []*api.TaskInfo
preempted := api.EmptyResource()
resreq := preemptor.Resreq.Clone()

for _, task := range node.Tasks {
if filter == nil {
preemptees = append(preemptees, task.Clone())
Expand Down Expand Up @@ -237,15 +238,17 @@ func preempt(
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)

if err := stmt.Pipeline(preemptor, node.Name); err != nil {
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
}
if preemptor.Resreq.LessEqual(preempted) {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
}

// Ignore pipeline error, will be corrected in next scheduling loop.
assigned = true
// Ignore pipeline error, will be corrected in next scheduling loop.
assigned = true

break
break
}
}

return assigned, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
task = tasks.Pop().(*api.TaskInfo)
}

resreq := task.Resreq.Clone()
reclaimed := api.EmptyResource()

assigned := false

for _, n := range ssn.Nodes {
Expand All @@ -120,6 +117,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
continue
}

resreq := task.Resreq.Clone()
reclaimed := api.EmptyResource()

glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)

Expand Down
61 changes: 8 additions & 53 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/utils"
)

type TaskID types.UID
Expand All @@ -40,8 +38,6 @@ type TaskInfo struct {
Name string
Namespace string

PodGroup *v1alpha1.PodGroup

Resreq *Resource

NodeName string
Expand All @@ -52,34 +48,16 @@ type TaskInfo struct {
Pod *v1.Pod
}

func getOwners(pod *v1.Pod) (JobID, *v1alpha1.PodGroup) {
func getJobID(pod *v1.Pod) JobID {
if len(pod.Annotations) != 0 {
if gn, found := pod.Annotations[v1alpha1.GroupNameAnnotationKey]; found && len(gn) != 0 {
// Make sure Pod and PodGroup belong to the same namespace.
jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn)
return JobID(jobID), nil
return JobID(jobID)
}
}

jobID := JobID(utils.GetController(pod))
if len(jobID) == 0 {
jobID = JobID(pod.UID)
}

pg := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: string(jobID),
Annotations: map[string]string{
ShadowPodGroupKey: string(jobID),
},
},
Spec: v1alpha1.PodGroupSpec{
MinMember: 1,
},
}

return jobID, pg
return ""
}

func NewTaskInfo(pod *v1.Pod) *TaskInfo {
Expand All @@ -90,12 +68,11 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
req.Add(NewResource(c.Resources.Requests))
}

jobID, pg := getOwners(pod)
jobID := getJobID(pod)

ti := &TaskInfo{
UID: TaskID(pod.UID),
Job: jobID,
PodGroup: pg,
Name: pod.Name,
Namespace: pod.Namespace,
NodeName: pod.Spec.NodeName,
Expand All @@ -118,7 +95,6 @@ func (ti *TaskInfo) Clone() *TaskInfo {
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
PodGroup: ti.PodGroup,
NodeName: ti.NodeName,
Status: ti.Status,
Priority: ti.Priority,
Expand All @@ -129,8 +105,8 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}

func (ti TaskInfo) String() string {
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, type %v, pri %v, resreq %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.PodGroup, ti.Priority, ti.Resreq)
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, resreq %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq)
}

// JobID is the type of JobInfo's ID.
Expand Down Expand Up @@ -198,33 +174,16 @@ func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) {
ji.Name = pg.Name
ji.Namespace = pg.Namespace
ji.MinAvailable = pg.Spec.MinMember

//set queue name based on the available information
//in the following priority order:
// 1. queue name from PodGroup spec (if available)
// 2. queue name from default-queue command line option (if specified)
// 3. namespace name
if len(pg.Spec.Queue) > 0 {
ji.Queue = QueueID(pg.Spec.Queue)
} else if len(options.Options().DefaultQueue) > 0 {
ji.Queue = QueueID(options.Options().DefaultQueue)
} else {
ji.Queue = QueueID(pg.Namespace)
}

ji.Queue = QueueID(pg.Spec.Queue)
ji.CreationTimestamp = pg.GetCreationTimestamp()

ji.PodGroup = pg
}

func (ji *JobInfo) SetPDB(pdb *policyv1.PodDisruptionBudget) {
ji.Name = pdb.Name
ji.MinAvailable = pdb.Spec.MinAvailable.IntVal
ji.Namespace = pdb.Namespace
if len(options.Options().DefaultQueue) == 0 {
ji.Queue = QueueID(pdb.Namespace)
} else {
ji.Queue = QueueID(options.Options().DefaultQueue)
}

ji.CreationTimestamp = pdb.GetCreationTimestamp()
ji.PDB = pdb
Expand Down Expand Up @@ -265,10 +224,6 @@ func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
if AllocatedStatus(ti.Status) {
ji.Allocated.Add(ti.Resreq)
}

if ji.PodGroup == nil && ti.PodGroup != nil {
ji.SetPodGroup(ti.PodGroup)
}
}

func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error {
Expand Down
Loading

0 comments on commit f19d32f

Please sign in to comment.