diff --git a/Makefile b/Makefile index c54662cf8b..302d52e2c7 100644 --- a/Makefile +++ b/Makefile @@ -105,7 +105,7 @@ generate-code: manifests: controller-gen go mod vendor # volcano crd base - $(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1;./vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/bus/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1" output:crd:artifacts:config=config/crd/volcano/bases + $(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1;./vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/bus/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/topology/v1alpha1" output:crd:artifacts:config=config/crd/volcano/bases # generate volcano job crd yaml without description to avoid yaml size limit when using `kubectl apply` $(CONTROLLER_GEN) $(CRD_OPTIONS_EXCLUDE_DESCRIPTION) paths="./vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1" output:crd:artifacts:config=config/crd/volcano/bases # jobflow crd base diff --git a/config/crd/jobflow/bases/flow.volcano.sh_jobtemplates.yaml b/config/crd/jobflow/bases/flow.volcano.sh_jobtemplates.yaml index 2555f5aa9f..6d442f8b1e 100644 --- a/config/crd/jobflow/bases/flow.volcano.sh_jobtemplates.yaml +++ b/config/crd/jobflow/bases/flow.volcano.sh_jobtemplates.yaml @@ -38,6 +38,18 @@ spec: format: int32 minimum: 1 type: integer + networkTopology: + properties: + highestTierAllowed: + default: 1 + type: integer + mode: + default: hard + enum: + - hard + - soft + type: string + type: object plugins: additionalProperties: items: diff --git a/config/crd/volcano/bases/batch.volcano.sh_jobs.yaml b/config/crd/volcano/bases/batch.volcano.sh_jobs.yaml index cbb78a3528..327372df72 100644 --- a/config/crd/volcano/bases/batch.volcano.sh_jobs.yaml +++ b/config/crd/volcano/bases/batch.volcano.sh_jobs.yaml @@ -56,6 +56,18 @@ spec: format: int32 minimum: 1 type: integer + networkTopology: + properties: + highestTierAllowed: + default: 1 + type: integer + mode: + default: hard + enum: + - hard + - soft + type: string + type: object plugins: additionalProperties: items: diff --git a/config/crd/volcano/bases/scheduling.volcano.sh_podgroups.yaml b/config/crd/volcano/bases/scheduling.volcano.sh_podgroups.yaml index 503122dd20..867ced09f7 100644 --- a/config/crd/volcano/bases/scheduling.volcano.sh_podgroups.yaml +++ b/config/crd/volcano/bases/scheduling.volcano.sh_podgroups.yaml @@ -89,6 +89,24 @@ spec: if there's not enough resources to start each task, the scheduler will not start anyone. type: object + networkTopology: + description: NetworkTopology defines the NetworkTopology config, this + field works in conjunction with network topology feature and hyperNode + CRD. + properties: + highestTierAllowed: + default: 1 + description: HighestTierAllowed specifies the highest tier that + a job allowed to cross when scheduling. + type: integer + mode: + default: hard + description: Mode specifies the mode of the network topology constrain. + enum: + - hard + - soft + type: string + type: object priorityClassName: description: |- If specified, indicates the PodGroup's priority. "system-node-critical" and diff --git a/config/crd/volcano/bases/topology.volcano.sh_hypernodes.yaml b/config/crd/volcano/bases/topology.volcano.sh_hypernodes.yaml new file mode 100644 index 0000000000..37a7133637 --- /dev/null +++ b/config/crd/volcano/bases/topology.volcano.sh_hypernodes.yaml @@ -0,0 +1,177 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.4 + name: hypernodes.topology.volcano.sh +spec: + group: topology.volcano.sh + names: + kind: HyperNode + listKind: HyperNodeList + plural: hypernodes + shortNames: + - hn + singular: hypernode + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.tier + name: Tier + type: string + - jsonPath: .status.nodeCount + name: NodeCount + type: integer + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + description: HyperNode represents a collection of nodes sharing similar network + topology or performance characteristics. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: Spec defines the desired configuration of the HyperNode. + properties: + members: + description: Members defines a list of node groups or individual nodes + included in the HyperNode. + items: + description: MemberSpec represents a specific node or a hyperNodes + in the hyperNode. + properties: + selector: + description: Selector defines the selection rules for this member. + properties: + exactMatch: + description: ExactMatch defines the exact match criteria + (required when Type is "Exact"). + properties: + name: + description: Name specifies the exact name of the node + to match. + type: string + type: object + regexMatch: + description: RegexMatch defines the regex match criteria + (required when Type is "Regex"). + properties: + pattern: + description: Pattern defines the regex pattern to match + node names. + type: string + type: object + type: object + x-kubernetes-validations: + - message: Either ExactMatch or RegexMatch must be specified + rule: self.exactMatch != null || self.regexMatch != null + - message: ExactMatch and RegexMatch cannot be specified together + rule: '!(self.exactMatch != null && self.regexMatch != null)' + type: + description: Type specifies the member type. + enum: + - Node + - HyperNode + type: string + required: + - type + type: object + type: array + tier: + description: Tier categorizes the performance level of the HyperNode. + type: string + required: + - tier + type: object + status: + description: Status provides the current state of the HyperNode. + properties: + conditions: + description: Conditions provide details about the current state of + the HyperNode. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + nodeCount: + description: NodeCount is the total number of nodes currently in the + HyperNode. + format: int64 + minimum: 0 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/go.mod b/go.mod index 6c76acf6b1..af4d2e14d9 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( sigs.k8s.io/controller-runtime v0.13.0 sigs.k8s.io/yaml v1.4.0 stathat.com/c/consistent v1.0.0 - volcano.sh/apis v1.10.0-alpha.0.0.20241016111016-bb93758bd51f + volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe ) require ( diff --git a/go.sum b/go.sum index b8b18be173..7791857a2e 100644 --- a/go.sum +++ b/go.sum @@ -510,5 +510,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= -volcano.sh/apis v1.10.0-alpha.0.0.20241016111016-bb93758bd51f h1:wqvGQgzYCPJSS07xE1LZbJ/Mxb1f/xFWThnII6BzMhg= -volcano.sh/apis v1.10.0-alpha.0.0.20241016111016-bb93758bd51f/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs= +volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe h1:iHd1Xt36a7S47IFksuF0h9W9J4LKzhBEz0C9XbkBvB8= +volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs= diff --git a/hack/generate-yaml.sh b/hack/generate-yaml.sh index c134512372..53cd952ea3 100755 --- a/hack/generate-yaml.sh +++ b/hack/generate-yaml.sh @@ -93,6 +93,7 @@ tail -n +2 ${VOLCANO_CRD_DIR}/bases/bus.volcano.sh_commands.yaml > ${HELM_VOLCAN tail -n +2 ${VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_podgroups.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_podgroups.yaml tail -n +2 ${VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_queues.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_queues.yaml tail -n +2 ${VOLCANO_CRD_DIR}/bases/nodeinfo.volcano.sh_numatopologies.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/nodeinfo.volcano.sh_numatopologies.yaml +tail -n +2 ${VOLCANO_CRD_DIR}/bases/topology.volcano.sh_hypernodes.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/topology.volcano.sh_hypernodes.yaml # sync jobflow bases tail -n +2 ${JOBFLOW_CRD_DIR}/bases/flow.volcano.sh_jobflows.yaml > ${HELM_JOBFLOW_CRD_DIR}/bases/flow.volcano.sh_jobflows.yaml @@ -136,6 +137,7 @@ ${HELM_BIN_DIR}/helm template ${VK_ROOT}/installer/helm/chart/volcano --namespac -s templates/scheduling_v1beta1_podgroup.yaml \ -s templates/scheduling_v1beta1_queue.yaml \ -s templates/nodeinfo_v1alpha1_numatopologies.yaml \ + -s templates/topology_v1alpha1_hypernodes.yaml \ -s templates/webhooks.yaml \ >> ${DEPLOYMENT_FILE} diff --git a/installer/helm/chart/volcano/charts/jobflow/crd/bases/flow.volcano.sh_jobtemplates.yaml b/installer/helm/chart/volcano/charts/jobflow/crd/bases/flow.volcano.sh_jobtemplates.yaml index 2702917f52..449878e171 100644 --- a/installer/helm/chart/volcano/charts/jobflow/crd/bases/flow.volcano.sh_jobtemplates.yaml +++ b/installer/helm/chart/volcano/charts/jobflow/crd/bases/flow.volcano.sh_jobtemplates.yaml @@ -37,6 +37,18 @@ spec: format: int32 minimum: 1 type: integer + networkTopology: + properties: + highestTierAllowed: + default: 1 + type: integer + mode: + default: hard + enum: + - hard + - soft + type: string + type: object plugins: additionalProperties: items: diff --git a/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml b/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml index 2c82e9526d..7c5c27cf78 100644 --- a/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml +++ b/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml @@ -55,6 +55,18 @@ spec: format: int32 minimum: 1 type: integer + networkTopology: + properties: + highestTierAllowed: + default: 1 + type: integer + mode: + default: hard + enum: + - hard + - soft + type: string + type: object plugins: additionalProperties: items: diff --git a/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_podgroups.yaml b/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_podgroups.yaml index 0035d4e6b6..6104675fea 100644 --- a/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_podgroups.yaml +++ b/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_podgroups.yaml @@ -88,6 +88,24 @@ spec: if there's not enough resources to start each task, the scheduler will not start anyone. type: object + networkTopology: + description: NetworkTopology defines the NetworkTopology config, this + field works in conjunction with network topology feature and hyperNode + CRD. + properties: + highestTierAllowed: + default: 1 + description: HighestTierAllowed specifies the highest tier that + a job allowed to cross when scheduling. + type: integer + mode: + default: hard + description: Mode specifies the mode of the network topology constrain. + enum: + - hard + - soft + type: string + type: object priorityClassName: description: |- If specified, indicates the PodGroup's priority. "system-node-critical" and diff --git a/installer/helm/chart/volcano/crd/bases/topology.volcano.sh_hypernodes.yaml b/installer/helm/chart/volcano/crd/bases/topology.volcano.sh_hypernodes.yaml new file mode 100644 index 0000000000..98cb1a1fc2 --- /dev/null +++ b/installer/helm/chart/volcano/crd/bases/topology.volcano.sh_hypernodes.yaml @@ -0,0 +1,176 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.4 + name: hypernodes.topology.volcano.sh +spec: + group: topology.volcano.sh + names: + kind: HyperNode + listKind: HyperNodeList + plural: hypernodes + shortNames: + - hn + singular: hypernode + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.tier + name: Tier + type: string + - jsonPath: .status.nodeCount + name: NodeCount + type: integer + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + description: HyperNode represents a collection of nodes sharing similar network + topology or performance characteristics. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: Spec defines the desired configuration of the HyperNode. + properties: + members: + description: Members defines a list of node groups or individual nodes + included in the HyperNode. + items: + description: MemberSpec represents a specific node or a hyperNodes + in the hyperNode. + properties: + selector: + description: Selector defines the selection rules for this member. + properties: + exactMatch: + description: ExactMatch defines the exact match criteria + (required when Type is "Exact"). + properties: + name: + description: Name specifies the exact name of the node + to match. + type: string + type: object + regexMatch: + description: RegexMatch defines the regex match criteria + (required when Type is "Regex"). + properties: + pattern: + description: Pattern defines the regex pattern to match + node names. + type: string + type: object + type: object + x-kubernetes-validations: + - message: Either ExactMatch or RegexMatch must be specified + rule: self.exactMatch != null || self.regexMatch != null + - message: ExactMatch and RegexMatch cannot be specified together + rule: '!(self.exactMatch != null && self.regexMatch != null)' + type: + description: Type specifies the member type. + enum: + - Node + - HyperNode + type: string + required: + - type + type: object + type: array + tier: + description: Tier categorizes the performance level of the HyperNode. + type: string + required: + - tier + type: object + status: + description: Status provides the current state of the HyperNode. + properties: + conditions: + description: Conditions provide details about the current state of + the HyperNode. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + nodeCount: + description: NodeCount is the total number of nodes currently in the + HyperNode. + format: int64 + minimum: 0 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/installer/helm/chart/volcano/templates/topology_v1alpha1_hypernodes.yaml b/installer/helm/chart/volcano/templates/topology_v1alpha1_hypernodes.yaml new file mode 100644 index 0000000000..6175850081 --- /dev/null +++ b/installer/helm/chart/volcano/templates/topology_v1alpha1_hypernodes.yaml @@ -0,0 +1 @@ +{{- tpl ($.Files.Get (printf "crd/%s/topology.volcano.sh_hypernodes.yaml" (include "crd_version" .))) . }} \ No newline at end of file diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 52a10b38c0..dfff3630b6 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -294,6 +294,18 @@ spec: format: int32 minimum: 1 type: integer + networkTopology: + properties: + highestTierAllowed: + default: 1 + type: integer + mode: + default: hard + enum: + - hard + - soft + type: string + type: object plugins: additionalProperties: items: @@ -4701,6 +4713,24 @@ spec: if there's not enough resources to start each task, the scheduler will not start anyone. type: object + networkTopology: + description: NetworkTopology defines the NetworkTopology config, this + field works in conjunction with network topology feature and hyperNode + CRD. + properties: + highestTierAllowed: + default: 1 + description: HighestTierAllowed specifies the highest tier that + a job allowed to cross when scheduling. + type: integer + mode: + default: hard + description: Mode specifies the mode of the network topology constrain. + enum: + - hard + - soft + type: string + type: object priorityClassName: description: |- If specified, indicates the PodGroup's priority. "system-node-critical" and @@ -5079,6 +5109,184 @@ spec: served: true storage: true --- +# Source: volcano/templates/topology_v1alpha1_hypernodes.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.4 + name: hypernodes.topology.volcano.sh +spec: + group: topology.volcano.sh + names: + kind: HyperNode + listKind: HyperNodeList + plural: hypernodes + shortNames: + - hn + singular: hypernode + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.tier + name: Tier + type: string + - jsonPath: .status.nodeCount + name: NodeCount + type: integer + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + description: HyperNode represents a collection of nodes sharing similar network + topology or performance characteristics. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: Spec defines the desired configuration of the HyperNode. + properties: + members: + description: Members defines a list of node groups or individual nodes + included in the HyperNode. + items: + description: MemberSpec represents a specific node or a hyperNodes + in the hyperNode. + properties: + selector: + description: Selector defines the selection rules for this member. + properties: + exactMatch: + description: ExactMatch defines the exact match criteria + (required when Type is "Exact"). + properties: + name: + description: Name specifies the exact name of the node + to match. + type: string + type: object + regexMatch: + description: RegexMatch defines the regex match criteria + (required when Type is "Regex"). + properties: + pattern: + description: Pattern defines the regex pattern to match + node names. + type: string + type: object + type: object + x-kubernetes-validations: + - message: Either ExactMatch or RegexMatch must be specified + rule: self.exactMatch != null || self.regexMatch != null + - message: ExactMatch and RegexMatch cannot be specified together + rule: '!(self.exactMatch != null && self.regexMatch != null)' + type: + description: Type specifies the member type. + enum: + - Node + - HyperNode + type: string + required: + - type + type: object + type: array + tier: + description: Tier categorizes the performance level of the HyperNode. + type: string + required: + - tier + type: object + status: + description: Status provides the current state of the HyperNode. + properties: + conditions: + description: Conditions provide details about the current state of + the HyperNode. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + nodeCount: + description: NodeCount is the total number of nodes currently in the + HyperNode. + format: int64 + minimum: 0 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} +--- # Source: volcano/templates/webhooks.yaml apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration @@ -5392,6 +5600,18 @@ spec: format: int32 minimum: 1 type: integer + networkTopology: + properties: + highestTierAllowed: + default: 1 + type: integer + mode: + default: hard + enum: + - hard + - soft + type: string + type: object plugins: additionalProperties: items: diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 3c99eefc1b..8e4bba4c6d 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -17,6 +17,7 @@ package allocate import ( + "sort" "time" "k8s.io/klog/v2" @@ -33,11 +34,19 @@ type Action struct { session *framework.Session // configured flag for error cache enablePredicateErrorCache bool + hyperNodesTiers []int + + // hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate + // all nodes' scores in each available hyperNode only when job has hard network topology constrains + // jobUID -> hyperNodeName -> score + hyperNodeScoresByJob map[string]map[string]float64 } func New() *Action { return &Action{ enablePredicateErrorCache: true, // default to enable it + hyperNodesTiers: []int{}, + hyperNodeScoresByJob: make(map[string]map[string]float64), } } @@ -52,11 +61,26 @@ func (alloc *Action) parseArguments(ssn *framework.Session) { arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey) } +func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) { + if ssn.HyperNodesListByTier == nil || len(ssn.HyperNodesListByTier) == 0 { + return + } + + // sort to guarantee the traverse order is from down to top. + var tiers []int + for tier := range ssn.HyperNodesListByTier { + tiers = append(tiers, tier) + } + sort.Ints(tiers) + alloc.hyperNodesTiers = tiers +} + func (alloc *Action) Execute(ssn *framework.Session) { klog.V(5).Infof("Enter Allocate ...") defer klog.V(5).Infof("Leaving Allocate ...") alloc.parseArguments(ssn) + alloc.parseHyperNodesTiers(ssn) // the allocation for pod may have many stages // 1. pick a queue named Q (using ssn.QueueOrderFn) @@ -176,7 +200,27 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a klog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>", tasks.Len(), job.Namespace, job.Name) - alloc.allocateResourcesForTasks(tasks, job, jobs, queue, allNodes) + hardMode, highestAllowedTier := job.HasTopologyHardConstrain() + var stmt *framework.Statement + var tasksQueue *util.PriorityQueue + if hardMode { + stmt, tasksQueue = alloc.allocateResourceForTasksWithTopology(tasks, job, queue, highestAllowedTier) + // There are still left tasks that need to be allocated when min available < replicas, put the job back and set pending tasks. + if tasksQueue != nil { + jobs.Push(job) + pendingTasks[job.UID] = tasksQueue + } + } else { + stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "") + // There are still left tasks that need to be allocated when min available < replicas, put the job back + if tasks.Len() > 0 { + jobs.Push(job) + } + } + + if stmt != nil { + stmt.Commit() + } // Put back the queue to priority queue after job's resource allocating finished, // To ensure that the priority of the queue is calculated based on the latest resource allocation situation. @@ -184,14 +228,127 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a } } -func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *api.JobInfo, jobs *util.PriorityQueue, queue *api.QueueInfo, allNodes []*api.NodeInfo) { +func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQueue, job *api.JobInfo, queue *api.QueueInfo, highestAllowedTier int) (*framework.Statement, *util.PriorityQueue) { + jobStmtsByTier := make(map[int]map[string]*framework.Statement) + hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue) + ssn := alloc.session + selectedTier := 0 + + // Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier. + for index, tier := range alloc.hyperNodesTiers { + if index+1 > highestAllowedTier { + klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier) + break + } + if len(jobStmtsByTier) > 0 { + klog.V(4).InfoS("Skip search for higher tier cause has found a suitable one", "tier", tier) + break + } + for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] { + nodes, ok := ssn.HyperNodes[hyperNodeName] + if !ok { + klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier) + continue + } + + // Clone tasks queue and rest job's fit err to make sure it's a clean cache when everytime filter a hyperNode and do not affect each other between hyperNodes. + tasksQueue := tasks.Clone() + job.ResetFitErr() + klog.V(3).InfoS("Try to allocate resource for job in hyperNode", "jobName", job.UID, "hyperNodeName", hyperNodeName, "tier", tier) + stmt := alloc.allocateResourcesForTasks(tasksQueue, job, queue, nodes, hyperNodeName) + if stmt == nil { + klog.V(4).InfoS("Cannot allocate resources for job with network topology constrains", "jobName", job.UID, "hyperNodeName", hyperNodeName, "tier", tier) + continue + } + + // Find an available hyperNode. + if _, ok = jobStmtsByTier[tier]; !ok { + jobStmtsByTier[tier] = make(map[string]*framework.Statement) + } + selectedTier = tier + // Just cache the allocation result because we haven't chosen the best hyperNode. + jobStmtsByTier[tier][hyperNodeName] = stmt.SaveOperations() + // Rollback current statement and try next hyperNode. + stmt.Discard() + + // If there are still unallocated tasks in the task queue, return and continue scheduling later. + if tasksQueue.Len() > 0 { + hyperNodesWithLeftTasks[hyperNodeName] = tasksQueue + } + } + } + + if len(jobStmtsByTier) > 0 { + hyperNodes := make([]string, 0, len(jobStmtsByTier[selectedTier])) + for hyperNodeName := range jobStmtsByTier[selectedTier] { + hyperNodes = append(hyperNodes, hyperNodeName) + } + klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes) + } + stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job) + return stmt, hyperNodesWithLeftTasks[hyperNode] +} + +// selectBestStmt return a stmt and best hyperNode related to the stmt, it will +// score and select the best hyperNode among all available hyperNodes. +func (alloc *Action) selectBestHyperNode(jobStmts map[string]*framework.Statement, job *api.JobInfo) (*framework.Statement, string) { + var bestStmt *framework.Statement + bestHyperNodeName := "" + ssn := alloc.session + + switch { + case len(jobStmts) == 0: + klog.V(3).InfoS("Failed to allocate resource for job, no available hyperNode is under highest allowed tier", "jobName", job.UID) + return nil, bestHyperNodeName + case len(jobStmts) == 1: + for hyperNodeName, stmt := range jobStmts { + bestStmt = stmt + bestHyperNodeName = hyperNodeName + break + } + case len(jobStmts) > 1: + candidateHyperNodeGroups := make(map[string][]*api.NodeInfo) + for hyperNodeName := range jobStmts { + candidateHyperNodeGroups[hyperNodeName] = ssn.HyperNodes[hyperNodeName] + } + + hyperNodeScores, err := util.PrioritizeHyperNodes(candidateHyperNodeGroups, alloc.hyperNodeScoresByJob[string(job.UID)], job, ssn.HyperNodeOrderMapFn) + if err != nil { + klog.V(3).ErrorS(err, "Failed to allocate resource for job", "jobName", job.UID) + return nil, bestHyperNodeName + } + + bestHyperNodeName = util.SelectBestHyperNode(hyperNodeScores) + + var exists bool + bestStmt, exists = jobStmts[bestHyperNodeName] + if !exists { + klog.ErrorS(nil, "Couldn't find best hyperNode in statements", "jobName", job.UID, "hyperNode", bestHyperNodeName) + return nil, bestHyperNodeName + } + } + + // Recover the stmt and return. + if bestStmt == nil || bestHyperNodeName == "" { + return nil, bestHyperNodeName + } + finalStmt := framework.NewStatement(ssn) + err := finalStmt.RecoverOperations(bestStmt) + if err != nil { + klog.ErrorS(err, "Failed to recover operations", "jobName", job.UID, "hyperNode", bestHyperNodeName) + return nil, bestHyperNodeName + } + klog.V(3).InfoS("Allocate job to hyperNode", "jobName", job.UID, "hyperNode", bestHyperNodeName) + return finalStmt, bestHyperNodeName +} + +func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *api.JobInfo, queue *api.QueueInfo, allNodes []*api.NodeInfo, hyperNode string) *framework.Statement { ssn := alloc.session stmt := framework.NewStatement(ssn) ph := util.NewPredicateHelper() for !tasks.Empty() { task := tasks.Pop().(*api.TaskInfo) - if !ssn.Allocatable(queue, task) { klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name) continue @@ -228,101 +385,127 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a } } - // Candidate nodes are divided into two gradients: - // - the first gradient node: a list of free nodes that satisfy the task resource request; - // - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request; - // Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list, - // otherwise, score the second gradient node and select the appropriate node. - var candidateNodes [][]*api.NodeInfo - var idleCandidateNodes []*api.NodeInfo - var futureIdleCandidateNodes []*api.NodeInfo - for _, n := range predicateNodes { - if task.InitResreq.LessEqual(n.Idle, api.Zero) { - idleCandidateNodes = append(idleCandidateNodes, n) - } else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) { - futureIdleCandidateNodes = append(futureIdleCandidateNodes, n) - } else { - klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v", - n.Name, n.Idle, n.FutureIdle(), task.Name) - } + bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes) + if bestNode == nil { + continue } - candidateNodes = append(candidateNodes, idleCandidateNodes) - candidateNodes = append(candidateNodes, futureIdleCandidateNodes) - var bestNode *api.NodeInfo - for index, nodes := range candidateNodes { - if klog.V(5).Enabled() { - for _, node := range nodes { - klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle()) - } - } - switch { - case len(nodes) == 0: - klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index) - case len(nodes) == 1: // If only one node after predicate, just use it. - bestNode = nodes[0] - case len(nodes) > 1: // If more than one node after predicate, using "the best" one - nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) - - bestNode = ssn.BestNodeFn(task, nodeScores) - if bestNode == nil { - bestNode = util.SelectBestNode(nodeScores) - } - } + alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore) + alloc.allocateResourcesForTask(stmt, task, bestNode, job) - // If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information. - if bestNode != nil { - break - } + if ssn.JobReady(job) && !tasks.Empty() { + break } + } - // Allocate idle resource to the task. - if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) { - klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, bestNode.Name) - if err := stmt.Allocate(task, bestNode); err != nil { - klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", - task.UID, bestNode.Name, ssn.UID, err) - if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil { - klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.", - task.UID, bestNode.Name, ssn.UID, rollbackErr) - } - } else { - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - } + if ssn.JobReady(job) { + klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID) + return stmt + } else { + if !ssn.JobPipelined(job) { + stmt.Discard() + } + return nil + } +} + +func (alloc *Action) sumNodeScoresInHyperNode(jobUID, hyperNode string, score float64) { + // normal vc job without networkTopology has no hyperNode, skip node scores accumulation. + if hyperNode == "" { + return + } + + if alloc.hyperNodeScoresByJob[jobUID] == nil { + alloc.hyperNodeScoresByJob[jobUID] = make(map[string]float64) + } + + alloc.hyperNodeScoresByJob[jobUID][hyperNode] += score +} + +// prioritizeNodes selects the highest score node. +func (alloc *Action) prioritizeNodes(ssn *framework.Session, task *api.TaskInfo, predicateNodes []*api.NodeInfo) (*api.NodeInfo, float64) { + // Candidate nodes are divided into two gradients: + // - the first gradient node: a list of free nodes that satisfy the task resource request; + // - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request; + // Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list, + // otherwise, score the second gradient node and select the appropriate node. + var candidateNodes [][]*api.NodeInfo + var idleCandidateNodes []*api.NodeInfo + var futureIdleCandidateNodes []*api.NodeInfo + for _, n := range predicateNodes { + if task.InitResreq.LessEqual(n.Idle, api.Zero) { + idleCandidateNodes = append(idleCandidateNodes, n) + } else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) { + futureIdleCandidateNodes = append(futureIdleCandidateNodes, n) } else { - klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", - task.Namespace, task.Name, bestNode.Name) - - // Allocate releasing resource to the task if any. - if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) { - klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", - task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing) - if err := stmt.Pipeline(task, bestNode.Name, false); err != nil { - klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", - task.UID, bestNode.Name, ssn.UID, err) - if rollbackErr := stmt.UnPipeline(task); rollbackErr != nil { - klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.", - task.UID, bestNode.Name, ssn.UID, rollbackErr) - } - } else { - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - } + klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v", + n.Name, n.Idle, n.FutureIdle(), task.Name) + } + } + candidateNodes = append(candidateNodes, idleCandidateNodes) + candidateNodes = append(candidateNodes, futureIdleCandidateNodes) + + var bestNode *api.NodeInfo + var higestScore float64 + for index, nodes := range candidateNodes { + if klog.V(5).Enabled() { + for _, node := range nodes { + klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle()) + } + } + switch { + case len(nodes) == 0: + klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index) + case len(nodes) == 1: // If only one node after predicate, just use it. + bestNode = nodes[0] + case len(nodes) > 1: // If more than one node after predicate, using "the best" one + nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + + bestNode = ssn.BestNodeFn(task, nodeScores) + if bestNode == nil { + bestNode, higestScore = util.SelectBestNodeAndScore(nodeScores) } } - if ssn.JobReady(job) && !tasks.Empty() { - jobs.Push(job) + // If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information. + if bestNode != nil { break } } + return bestNode, higestScore +} - if ssn.JobReady(job) { - stmt.Commit() - } else { - if !ssn.JobPipelined(job) { - stmt.Discard() +func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *api.TaskInfo, node *api.NodeInfo, job *api.JobInfo) { + // Allocate idle resource to the task. + if task.InitResreq.LessEqual(node.Idle, api.Zero) { + klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) + if err := stmt.Allocate(task, node); err != nil { + klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", + task.UID, node.Name, alloc.session.UID, err) + if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil { + klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.", + task.UID, node.Name, alloc.session.UID, rollbackErr) + } + } else { + metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) + metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) + } + return + } + + klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", + task.Namespace, task.Name, node.Name) + + // Allocate releasing resource to the task if any. + if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { + klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", + task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) + if err := stmt.Pipeline(task, node.Name, false); err != nil { + klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", + task.UID, node.Name, alloc.session.UID, err) + } else { + metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) + metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) } } } diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 670e19104e..5fead3f8f1 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" @@ -39,6 +40,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/plugins/binpack" "volcano.sh/volcano/pkg/scheduler/plugins/drf" "volcano.sh/volcano/pkg/scheduler/plugins/gang" "volcano.sh/volcano/pkg/scheduler/plugins/nodeorder" @@ -95,7 +97,7 @@ func TestAllocate(t *testing.T) { ExpectBindsNum: 1, }, { - Name: "prepredicate failed and tasks are not used up, continue on untill min member meet", + Name: "prepredicate failed and tasks are not used up, continue on until min member meet", PodGroups: []*schedulingv1.PodGroup{ util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 1, "worker": 1}, schedulingv1.PodGroupInqueue), }, @@ -268,6 +270,295 @@ func TestAllocate(t *testing.T) { } } +func TestAllocateWithNetWorkTopologies(t *testing.T) { + plugins := map[string]framework.PluginBuilder{ + predicates.PluginName: predicates.New, + gang.PluginName: gang.New, + } + + tests := []uthelper.TestCommonStruct{ + { + Name: "soft network topology constrain, can allocate job when resources are enough", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 3, nil, schedulingv1.PodGroupInqueue, "soft", 1), + }, + Pods: []*v1.Pod{ + // should use different role, because allocate actions default to enable the role caches when predicate + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + }, + HyperNodesListByTier: map[int][]string{0: {"s0", "s1"}, 1: {"s2"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1", "s0-n2"), + "s1": sets.New[string]("s1-n3", "s1-n4"), + "s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + MinimalBindCheck: true, + ExpectBindsNum: 3, + }, + { + Name: "hard network topology constrain, can not allocate job when cross highestTierAllowed tier", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 3, nil, schedulingv1.PodGroupInqueue, "hard", 1), + }, + Pods: []*v1.Pod{ + // should use different role, because allocate actions default to enable the role caches when predicate + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + }, + HyperNodesListByTier: map[int][]string{1: {"s0", "s1"}, 2: {"s2"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1", "s0-n2"), + "s1": sets.New[string]("s1-n3", "s1-n4"), + "s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + ExpectBindsNum: 0, + MinimalBindCheck: true, + }, + { + Name: "hard network topology constrain, can allocate job when highestTierAllowed not reached", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 3, nil, schedulingv1.PodGroupInqueue, "hard", 2), + }, + Pods: []*v1.Pod{ + // should use different role, because allocate actions default to enable the role caches when predicate + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + }, + HyperNodesListByTier: map[int][]string{1: {"s0", "s1"}, 2: {"s2"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1", "s0-n2"), + "s1": sets.New[string]("s1-n3", "s1-n4"), + "s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + ExpectBindsNum: 3, + MinimalBindCheck: true, + }, + { + Name: "hard network topology constrain, can allocate job when multi hyperNodes are available", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1), + }, + Pods: []*v1.Pod{ + // should use different role, because allocate actions default to enable the role caches when predicate + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + }, + HyperNodesListByTier: map[int][]string{2: {"s0", "s1"}, 3: {"s2"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1", "s0-n2"), + "s1": sets.New[string]("s1-n3", "s1-n4"), + "s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + ExpectBindsNum: 2, + MinimalBindCheck: true, + }, + { + Name: "hard network topology constrain, can allocate job when minavailiable < replicas", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 1, nil, schedulingv1.PodGroupInqueue, "hard", 1), + }, + Pods: []*v1.Pod{ + // should use different role, because allocate actions default to enable the role caches when predicate + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + }, + HyperNodesListByTier: map[int][]string{1: {"s0", "s1"}, 2: {"s2"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1", "s0-n2"), + "s1": sets.New[string]("s1-n3", "s1-n4"), + "s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + ExpectBindsNum: 2, + MinimalBindCheck: true, + }, + { + Name: "hard network topology constrain, two available hyperNodes, can allocate job to nodes with affinity", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 1, nil, schedulingv1.PodGroupInqueue, "hard", 1), + }, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}), + }, + HyperNodesListByTier: map[int][]string{0: {"s0", "s1"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1"), + "s1": sets.New[string]("s1-n2"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + ExpectBindMap: map[string]string{ + "c1/p1": "s1-n2", + }, + ExpectBindsNum: 1, + }, + } + + trueValue := true + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: gang.PluginName, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledJobStarving: &trueValue, + }, + { + Name: predicates.PluginName, + EnabledPredicate: &trueValue, + }, + }, + }, + } + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + test.Plugins = plugins + test.RegisterSession(tiers, nil) + defer test.Close() + test.Run([]framework.Action{New()}) + if err := test.CheckAll(i); err != nil { + t.Fatal(err) + } + }) + } +} + +func TestNodeLevelScoreWithNetWorkTopologies(t *testing.T) { + plugins := map[string]framework.PluginBuilder{ + predicates.PluginName: predicates.New, + gang.PluginName: gang.New, + binpack.PluginName: binpack.New, + } + + tests := []uthelper.TestCommonStruct{ + { + Name: "hard network topology constrain, allocate job to highest score hypeNode with node level binpack", + PodGroups: []*schedulingv1.PodGroup{ + util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1), + util.BuildPodGroupWithNetWorkTopologies("pg2", "c1", "q1", 2, nil, schedulingv1.PodGroupRunning, "", 1), + }, + Pods: []*v1.Pod{ + // should use different role, because allocate actions default to enable the role caches when predicate + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4Gi"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("4", "8Gi"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + + util.BuildPod("c1", "p3", "s0-n1", v1.PodRunning, api.BuildResourceList("2", "4Gi"), "pg2", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p4", "s0-n2", v1.PodRunning, api.BuildResourceList("4", "8Gi"), "pg2", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("s0-n1", api.BuildResourceList("4", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s0-n2", api.BuildResourceList("8", "16Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n3", api.BuildResourceList("4", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + util.BuildNode("s1-n4", api.BuildResourceList("8", "16Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil), + }, + HyperNodesListByTier: map[int][]string{0: {"s0", "s1"}}, + HyperNodes: map[string]sets.Set[string]{ + "s0": sets.New[string]("s0-n1", "s0-n2"), + "s1": sets.New[string]("s1-n3", "s1-n4"), + "s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"), + }, + Queues: []*schedulingv1.Queue{ + util.BuildQueue("q1", 1, nil), + }, + ExpectBindsNum: 2, + // "s0-n1" and "s0-n2" nodes have running pods, so get higher score when enable binpack. + ExpectBindMap: map[string]string{ + "c1/p1": "s0-n1", + "c1/p2": "s0-n2", + }, + }, + } + + trueValue := true + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: gang.PluginName, + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledJobStarving: &trueValue, + }, + { + Name: predicates.PluginName, + EnabledPredicate: &trueValue, + }, + { + Name: binpack.PluginName, + EnabledNodeOrder: &trueValue, + }, + }, + }, + } + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + test.Plugins = plugins + test.RegisterSession(tiers, nil) + defer test.Close() + test.Run([]framework.Action{New()}) + if err := test.CheckAll(i); err != nil { + t.Fatal(err) + } + }) + } +} + func TestFareShareAllocate(t *testing.T) { plugins := map[string]framework.PluginBuilder{ drf.PluginName: drf.New, diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 7ee7783c35..d555a85da3 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -85,7 +85,7 @@ func (backfill *Action) Execute(ssn *framework.Session) { nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) node = ssn.BestNodeFn(task, nodeScores) if node == nil { - node = util.SelectBestNode(nodeScores) + node, _ = util.SelectBestNodeAndScore(nodeScores) } } diff --git a/pkg/scheduler/api/cluster_info.go b/pkg/scheduler/api/cluster_info.go index cc43805a90..c5a3153c71 100644 --- a/pkg/scheduler/api/cluster_info.go +++ b/pkg/scheduler/api/cluster_info.go @@ -18,17 +18,21 @@ package api import ( "fmt" + + "k8s.io/apimachinery/pkg/util/sets" ) // ClusterInfo is a snapshot of cluster by cache. type ClusterInfo struct { - Jobs map[JobID]*JobInfo - Nodes map[string]*NodeInfo - Queues map[QueueID]*QueueInfo - NamespaceInfo map[NamespaceName]*NamespaceInfo - RevocableNodes map[string]*NodeInfo - NodeList []string - CSINodesStatus map[string]*CSINodeStatusInfo + Jobs map[JobID]*JobInfo + Nodes map[string]*NodeInfo + HyperNodesListByTier map[int][]string + HyperNodes map[string]sets.Set[string] + Queues map[QueueID]*QueueInfo + NamespaceInfo map[NamespaceName]*NamespaceInfo + RevocableNodes map[string]*NodeInfo + NodeList []string + CSINodesStatus map[string]*CSINodeStatusInfo } func (ci ClusterInfo) String() string { diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index d8284d5523..222416d1bb 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -1052,3 +1052,18 @@ func (ji *JobInfo) IsPending() bool { func (ji *JobInfo) HasPendingTasks() bool { return len(ji.TaskStatusIndex[Pending]) != 0 } + +// HasTopologyHardConstrain return pg's NetworkTopologies mode and highest allowed tier. +func (ji *JobInfo) HasTopologyHardConstrain() (bool, int) { + if ji.PodGroup == nil || ji.PodGroup.Spec.NetworkTopology == nil || ji.PodGroup.Spec.NetworkTopology.HighestTierAllowed == nil { + return false, 0 + } + + return ji.PodGroup.Spec.NetworkTopology.Mode == scheduling.HardNetworkTopologyMode, *ji.PodGroup.Spec.NetworkTopology.HighestTierAllowed +} + +// ResetFitErr will set job and node fit err to nil. +func (ji *JobInfo) ResetFitErr() { + ji.JobFitErrors = "" + ji.NodesFitErrors = make(map[TaskID]*FitErrors) +} diff --git a/pkg/scheduler/api/job_info_test.go b/pkg/scheduler/api/job_info_test.go index 204c32167a..2791736641 100644 --- a/pkg/scheduler/api/job_info_test.go +++ b/pkg/scheduler/api/job_info_test.go @@ -440,3 +440,79 @@ func TestGetElasticResources(t *testing.T) { } } } + +func TestHasTopologyHardConstrain(t *testing.T) { + HighestTierAllowedTwo := 2 + HighestTierAllowedThree := 3 + tests := []struct { + name string + jobInfo *JobInfo + expectedHasHard bool + expectedTier int + }{ + { + name: "Nil PodGroup", + jobInfo: &JobInfo{ + PodGroup: nil, + }, + expectedHasHard: false, + expectedTier: 0, + }, + { + name: "Nil NetworkTopologies", + jobInfo: &JobInfo{ + PodGroup: &PodGroup{ + PodGroup: scheduling.PodGroup{ + Spec: scheduling.PodGroupSpec{ + NetworkTopology: nil, + }, + }, + }, + }, + expectedHasHard: false, + expectedTier: 0, + }, + { + name: "Hard Mode", + jobInfo: &JobInfo{ + PodGroup: &PodGroup{ + PodGroup: scheduling.PodGroup{ + Spec: scheduling.PodGroupSpec{ + NetworkTopology: &scheduling.NetworkTopologySpec{ + Mode: scheduling.HardNetworkTopologyMode, + HighestTierAllowed: &HighestTierAllowedTwo, + }, + }, + }, + }, + }, + expectedHasHard: true, + expectedTier: 2, + }, + { + name: "Soft Mode", + jobInfo: &JobInfo{ + PodGroup: &PodGroup{ + PodGroup: scheduling.PodGroup{ + Spec: scheduling.PodGroupSpec{ + NetworkTopology: &scheduling.NetworkTopologySpec{ + Mode: scheduling.SoftNetworkTopologyMode, + HighestTierAllowed: &HighestTierAllowedThree, + }, + }, + }, + }, + }, + expectedHasHard: false, + expectedTier: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hasHard, tier := tt.jobInfo.HasTopologyHardConstrain() + assert.Equal(t, tt.expectedHasHard, hasHard) + assert.Equal(t, tt.expectedTier, tier) + }) + } +} diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 6e8c920b35..c5e750e7cc 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -288,6 +288,9 @@ type EvictableFn func(*TaskInfo, []*TaskInfo) ([]*TaskInfo, int) // NodeOrderFn is the func declaration used to get priority score for a node for a particular task. type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error) +// HyperNodeOrderFn is the func declaration used to score hyperNodes for job. +type HyperNodeOrderFn func(*JobInfo, map[string][]*NodeInfo) (map[string]float64, error) + // BatchNodeOrderFn is the func declaration used to get priority score for ALL nodes for a particular task. type BatchNodeOrderFn func(*TaskInfo, []*NodeInfo) (map[string]float64, error) @@ -300,6 +303,9 @@ type NodeReduceFn func(*TaskInfo, k8sframework.NodeScoreList) error // NodeOrderMapFn is the func declaration used to get priority score of all plugins for a node for a particular task. type NodeOrderMapFn func(*TaskInfo, *NodeInfo) (map[string]float64, float64, error) +// HyperNodeOrderMapFn is the func declaration used to get priority score of all plugins for a hyperNode for a particular job. +type HyperNodeOrderMapFn func(*JobInfo, map[string][]*NodeInfo) (map[string]map[string]float64, error) + // NodeOrderReduceFn is the func declaration used to reduce priority score of all nodes for a plugin for a particular task. type NodeOrderReduceFn func(*TaskInfo, map[string]k8sframework.NodeScoreList) (map[string]float64, error) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index d0cf336bd8..6ba27cc3a8 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -135,6 +135,8 @@ type SchedulerCache struct { defaultPriorityClass *schedulingv1.PriorityClass defaultPriority int32 CSINodesStatus map[string]*schedulingapi.CSINodeStatusInfo + HyperNodesListByTier map[int][]string + HyperNodes map[string]sets.Set[string] NamespaceCollection map[string]*schedulingapi.NamespaceCollection @@ -581,6 +583,10 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo), imageStates: make(map[string]*imageState), + // HyperNode info + HyperNodesListByTier: make(map[int][]string), + HyperNodes: make(map[string]sets.Set[string]), + NodeList: []string{}, nodeWorkers: nodeWorkers, } @@ -1328,13 +1334,15 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo { defer sc.Mutex.Unlock() snapshot := &schedulingapi.ClusterInfo{ - Nodes: make(map[string]*schedulingapi.NodeInfo), - Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo), - Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo), - NamespaceInfo: make(map[schedulingapi.NamespaceName]*schedulingapi.NamespaceInfo), - RevocableNodes: make(map[string]*schedulingapi.NodeInfo), - NodeList: make([]string, len(sc.NodeList)), - CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo), + Nodes: make(map[string]*schedulingapi.NodeInfo), + HyperNodesListByTier: make(map[int][]string), + HyperNodes: make(map[string]sets.Set[string]), + Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo), + Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo), + NamespaceInfo: make(map[schedulingapi.NamespaceName]*schedulingapi.NamespaceInfo), + RevocableNodes: make(map[string]*schedulingapi.NodeInfo), + NodeList: make([]string, len(sc.NodeList)), + CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo), } copy(snapshot.NodeList, sc.NodeList) @@ -1358,6 +1366,22 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo { } } + // Snapshot hyperNodes. + copiedHyperNodesListByTier := make(map[int][]string, len(sc.HyperNodesListByTier)) + for tier, row := range sc.HyperNodesListByTier { + copiedHyperNodesListByTier[tier] = make([]string, len(row)) + copy(copiedHyperNodesListByTier[tier], row) + } + + hyperNodeLength := make(map[string]int) + snapshot.HyperNodesListByTier = copiedHyperNodesListByTier + copiedHyperNodes := make(map[string]sets.Set[string], len(sc.HyperNodes)) + for name, value := range sc.HyperNodes { + copiedHyperNodes[name] = value.Clone() + hyperNodeLength[name] = value.Len() + } + snapshot.HyperNodes = copiedHyperNodes + for _, value := range sc.Queues { snapshot.Queues[value.UID] = value.Clone() } @@ -1411,8 +1435,8 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo { } wg.Wait() - klog.V(3).Infof("There are <%d> Jobs, <%d> Queues and <%d> Nodes in total for scheduling.", - len(snapshot.Jobs), len(snapshot.Queues), len(snapshot.Nodes)) + klog.V(3).InfoS("SnapShot for scheduling", "jobNum", len(snapshot.Jobs), "QueueNum", + len(snapshot.Queues), "NodeNum", len(snapshot.Nodes), "tiers", snapshot.HyperNodesListByTier, "hyperNodeNum", hyperNodeLength) return snapshot } diff --git a/pkg/scheduler/cache/dumper.go b/pkg/scheduler/cache/dumper.go index d5ff3ff123..502667bc0e 100644 --- a/pkg/scheduler/cache/dumper.go +++ b/pkg/scheduler/cache/dumper.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "volcano.sh/volcano/pkg/scheduler/api" @@ -52,7 +53,8 @@ func (d *Dumper) dumpToJSONFile() { } defer file.Close() klog.Infoln("Starting to dump info in scheduler cache to file", fName) - if err = json.NewEncoder(file).Encode(snapshot.Nodes); err != nil { + + if err := encodeCache(file, snapshot.Nodes, snapshot.HyperNodesListByTier, snapshot.HyperNodes, snapshot.Jobs); err != nil { klog.Errorf("Failed to dump info in scheduler cache, json encode error: %v", err) return } @@ -60,6 +62,16 @@ func (d *Dumper) dumpToJSONFile() { klog.Infoln("Successfully dump info in scheduler cache to file", fName) } +func encodeCache(file *os.File, v ...interface{}) error { + for _, item := range v { + err := json.NewEncoder(file).Encode(item) + if err != nil { + return err + } + } + return nil +} + // dumpAll prints all information to log func (d *Dumper) dumpAll() { snapshot := d.Cache.Snapshot() @@ -73,6 +85,9 @@ func (d *Dumper) dumpAll() { klog.Info(d.printJobInfo(jobInfo)) } + klog.Info("Dump of hyperNodes info in scheduler cache") + d.printHyperNodeInfo(snapshot.HyperNodesListByTier, snapshot.HyperNodes) + d.displaySchedulerMemStats() } @@ -98,6 +113,17 @@ func (d *Dumper) printJobInfo(jobInfo *api.JobInfo) string { return data.String() } +func (d *Dumper) printHyperNodeInfo(HyperNodesListByTier map[int][]string, HyperNodes map[string]sets.Set[string]) { + var data strings.Builder + data.WriteString("\n") + for tier, hyperNodes := range HyperNodesListByTier { + for _, hyperNode := range hyperNodes { + data.WriteString(fmt.Sprintf("Tier: %d, HyperNodeName: %s, Nodes: %s\n", tier, hyperNode, HyperNodes[hyperNode])) + } + } + data.WriteString("\n") +} + // ListenForSignal starts a goroutine that will respond when process // receives SIGUSER1/SIGUSER2 signal. func (d *Dumper) ListenForSignal(stopCh <-chan struct{}) { diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index e28a08345e..ebacefb696 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -75,6 +75,12 @@ type Session struct { Tiers []conf.Tier Configurations []conf.Configuration NodeList []*api.NodeInfo + // HyperNodesListByTier contains a list of hyperNodes by tier from down to top, nodes under the same hyperNode + // have the same topology domain, e.g., nodes under the same switch or tor, jobs allocated in the same + // hyperNode can gain a better performance, the lower the tier of hyperNode, the better performance. + HyperNodesListByTier map[int][]string + // HyperNodes maps hyperNode Name -> nodes under the hyperNode. + HyperNodes map[string][]*api.NodeInfo plugins map[string]Plugin eventHandlers []*EventHandler @@ -90,6 +96,7 @@ type Session struct { batchNodeOrderFns map[string]api.BatchNodeOrderFn nodeMapFns map[string]api.NodeMapFn nodeReduceFns map[string]api.NodeReduceFn + hyperNodeOrderFns map[string]api.HyperNodeOrderFn preemptableFns map[string]api.EvictableFn reclaimableFns map[string]api.EvictableFn overusedFns map[string]api.ValidateFn @@ -141,6 +148,7 @@ func openSession(cache cache.Cache) *Session { batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, nodeMapFns: map[string]api.NodeMapFn{}, nodeReduceFns: map[string]api.NodeReduceFn{}, + hyperNodeOrderFns: map[string]api.HyperNodeOrderFn{}, preemptableFns: map[string]api.EvictableFn{}, reclaimableFns: map[string]api.EvictableFn{}, overusedFns: map[string]api.ValidateFn{}, @@ -185,6 +193,8 @@ func openSession(cache cache.Cache) *Session { } } ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList) + ssn.HyperNodesListByTier = snapshot.HyperNodesListByTier + ssn.HyperNodes = util.GetHyperNodeList(snapshot.HyperNodes, snapshot.Nodes) ssn.Nodes = snapshot.Nodes ssn.CSINodesStatus = snapshot.CSINodesStatus ssn.RevocableNodes = snapshot.RevocableNodes diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 150681f03c..c325d5c226 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -90,6 +90,11 @@ func (ssn *Session) AddNodeOrderFn(name string, pf api.NodeOrderFn) { ssn.nodeOrderFns[name] = pf } +// AddHyperNodeOrederFn add hyperNode order function +func (ssn *Session) AddHyperNodeOrederFn(name string, fn api.HyperNodeOrderFn) { + ssn.hyperNodeOrderFns[name] = fn +} + // AddBatchNodeOrderFn add Batch Node order function func (ssn *Session) AddBatchNodeOrderFn(name string, pf api.BatchNodeOrderFn) { ssn.batchNodeOrderFns[name] = pf @@ -784,6 +789,26 @@ func (ssn *Session) NodeOrderMapFn(task *api.TaskInfo, node *api.NodeInfo) (map[ return nodeScoreMap, priorityScore, nil } +// HyperNodeOrderMapFn invoke hyperNode order function of the plugins +func (ssn *Session) HyperNodeOrderMapFn(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]map[string]float64, error) { + nodeGroupScore := make(map[string]map[string]float64) + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + pfn, found := ssn.hyperNodeOrderFns[plugin.Name] + if !found { + continue + } + scoreTmp, err := pfn(job, hyperNodes) + if err != nil { + return nodeGroupScore, err + } + + nodeGroupScore[plugin.Name] = scoreTmp + } + } + return nodeGroupScore, nil +} + // NodeOrderReduceFn invoke node order function of the plugins func (ssn *Session) NodeOrderReduceFn(task *api.TaskInfo, pluginNodeScoreMap map[string]k8sframework.NodeScoreList) (map[string]float64, error) { nodeScoreMap := map[string]float64{} diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 5a49ab1a35..8fe0c0d911 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -17,6 +17,7 @@ limitations under the License. package framework import ( + "errors" "fmt" "k8s.io/klog/v2" @@ -440,3 +441,70 @@ func (s *Statement) Commit() { } } } + +func (s *Statement) SaveOperations() *Statement { + s.outputOperations("Save operations: ", 4) + + stmtTmp := &Statement{} + for _, op := range s.operations { + stmtTmp.operations = append(stmtTmp.operations, operation{ + name: op.name, + task: op.task.Clone(), + reason: op.reason, + }) + } + return stmtTmp +} + +func (s *Statement) RecoverOperations(stmt *Statement) error { + if stmt == nil { + return errors.New("statement is nil") + } + s.outputOperations("Recover operations: ", 4) + for _, op := range stmt.operations { + switch op.name { + case Evict: + err := s.Evict(op.task, op.reason) + if err != nil { + klog.Errorf("Failed to evict task: %s", err.Error()) + return err + } + case Pipeline: + err := s.Pipeline(op.task, op.task.NodeName, false) + if err != nil { + klog.Errorf("Failed to pipeline task: %s", err.Error()) + return err + } + case Allocate: + node := s.ssn.Nodes[op.task.NodeName] + err := s.Allocate(op.task, node) + if err != nil { + if e := s.unallocate(op.task); e != nil { + klog.Errorf("Failed to unallocate task <%v/%v>: %v", op.task.Namespace, op.task.Name, e) + } + klog.Errorf("Failed to allocate task <%v/%v>: %v", op.task.Namespace, op.task.Name, err) + return err + } + } + } + return nil +} + +func (s *Statement) outputOperations(msg string, level klog.Level) { + if !klog.V(level).Enabled() { + return + } + + var buffer string + for _, op := range s.operations { + switch op.name { + case Evict: + buffer += fmt.Sprintf("task %s evict from node %s ", op.task.Name, op.task.NodeName) + case Pipeline: + buffer += fmt.Sprintf("task %s pipeline from node %s ", op.task.Name, op.task.NodeName) + case Allocate: + buffer += fmt.Sprintf("task %s allocate from node %s ", op.task.Name, op.task.NodeName) + } + } + klog.V(level).Info(msg, buffer) +} diff --git a/pkg/scheduler/uthelper/helper.go b/pkg/scheduler/uthelper/helper.go index b1722a60f0..61a2e72d1e 100644 --- a/pkg/scheduler/uthelper/helper.go +++ b/pkg/scheduler/uthelper/helper.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/util/sets" "volcano.sh/apis/pkg/apis/scheduling" vcapisv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -48,12 +49,14 @@ type TestCommonStruct struct { // Plugins plugins for each case Plugins map[string]framework.PluginBuilder // Resource objects that need to be added to schedulercache - Pods []*v1.Pod - Nodes []*v1.Node - PodGroups []*vcapisv1.PodGroup - Queues []*vcapisv1.Queue - PriClass []*schedulingv1.PriorityClass - ResourceQuotas []*v1.ResourceQuota + Pods []*v1.Pod + Nodes []*v1.Node + HyperNodesListByTier map[int][]string + HyperNodes map[string]sets.Set[string] + PodGroups []*vcapisv1.PodGroup + Queues []*vcapisv1.Queue + PriClass []*schedulingv1.PriorityClass + ResourceQuotas []*v1.ResourceQuota // ExpectBindMap the expected bind results. // bind results: ns/podName -> nodeName @@ -71,6 +74,9 @@ type TestCommonStruct struct { // ExpectEvictNum the expected evict events numbers, include preempted and reclaimed evict events ExpectEvictNum int + // MinimalBindCheck true will only check both bind num, false by default. + MinimalBindCheck bool + // fake interface instance when check results need stop chan struct{} binder cache.Binder @@ -122,6 +128,8 @@ func (test *TestCommonStruct) createSchedulerCache() *cache.SchedulerCache { for _, rq := range test.ResourceQuotas { schedulerCache.AddResourceQuota(rq) } + schedulerCache.HyperNodesListByTier = test.HyperNodesListByTier + schedulerCache.HyperNodes = test.HyperNodes return schedulerCache } @@ -168,9 +176,10 @@ func (test *TestCommonStruct) CheckAll(caseIndex int) (err error) { // CheckBind check expected bind result func (test *TestCommonStruct) CheckBind(caseIndex int) error { - if test.ExpectBindsNum != len(test.ExpectBindMap) { + if test.ExpectBindsNum != len(test.ExpectBindMap) && !test.MinimalBindCheck { return fmt.Errorf("invalid setting for binding check: want bind count %d, want bind result length %d", test.ExpectBindsNum, len(test.ExpectBindMap)) } + binder := test.binder.(*util.FakeBinder) for i := 0; i < test.ExpectBindsNum; i++ { select { @@ -187,6 +196,10 @@ func (test *TestCommonStruct) CheckBind(caseIndex int) error { return fmt.Errorf("unexpect binding %s in case %d(%s)", key, caseIndex, test.Name) } + if test.MinimalBindCheck { + return nil + } + binds := binder.Binds() if len(test.ExpectBindMap) != len(binds) { return fmt.Errorf("case %d(%s) check bind: \nwant: %v\n got %v ", caseIndex, test.Name, test.ExpectBindMap, binds) diff --git a/pkg/scheduler/util/priority_queue.go b/pkg/scheduler/util/priority_queue.go index c30311ed61..66d3ff130c 100644 --- a/pkg/scheduler/util/priority_queue.go +++ b/pkg/scheduler/util/priority_queue.go @@ -66,6 +66,20 @@ func (q *PriorityQueue) Len() int { return q.queue.Len() } +func (q *PriorityQueue) Clone() *PriorityQueue { + newPq := &PriorityQueue{ + queue: priorityQueue{ + items: make([]interface{}, 0), + lessFn: q.queue.lessFn, + }, + } + + for _, it := range q.queue.items { + newPq.Push(it) + } + return newPq +} + func (pq *priorityQueue) Len() int { return len(pq.items) } func (pq *priorityQueue) Less(i, j int) bool { diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 64f259ddbb..f8a80e98b2 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -126,6 +127,50 @@ func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.Batc return nodeScores } +// PrioritizeHyperNodes returns a map whose key is hyperNode's score and value are corresponding hyperNodes +// it accumulates two parts score: +// 1.node level scores of each hyperNode in NodeOrder extension. +// 2.hyperNode level scores scored in HyperNodeOrder extension. +func PrioritizeHyperNodes(candidateHyperNodes map[string][]*api.NodeInfo, nodeScoresInHyperNode map[string]float64, job *api.JobInfo, fn api.HyperNodeOrderMapFn) (map[float64][]string, error) { + hyperNodesScoreMap := make(map[string]float64) + mapScores, err := fn(job, candidateHyperNodes) + if err != nil { + return nil, err + } + + // plugin scores of hyperNode. + for pluginName, scores := range mapScores { + for hyperNode, score := range scores { + klog.V(5).InfoS("Add plugin score at hypeNode", "jobName", job.UID, "pluginName", pluginName, "hyperNodeName", hyperNode, "score", score) + hyperNodesScoreMap[hyperNode] += score + } + } + + // accumulate node scores in NodeOrder and hyperNode score itself as the final score of each hyperNode. + for hyperNodeName, score := range nodeScoresInHyperNode { + klog.V(5).InfoS("Add node level scores to final hyperNode score", "jobName", job.UID, "hyperNodeName", hyperNodeName, "score", score) + hyperNodesScoreMap[hyperNodeName] += score + } + + hyperNodeScores := make(map[float64][]string) + hyperNodeScoreMap := make(map[string]float64) + for hyperNodeName := range candidateHyperNodes { + // If no plugin is applied to this node, the default is 0.0 + score := 0.0 + if value, ok := hyperNodesScoreMap[hyperNodeName]; ok { + score += value + } + hyperNodeScores[score] = append(hyperNodeScores[score], hyperNodeName) + + if klog.V(5).Enabled() { + hyperNodeScoreMap[hyperNodeName] = score + } + } + + klog.V(5).InfoS("Prioritize hyperNode score map for job", "jobName", job.UID, "scoreMap", hyperNodeScoreMap) + return hyperNodeScores, nil +} + // SortNodes returns nodes by order of score func SortNodes(nodeScores map[float64][]*api.NodeInfo) []*api.NodeInfo { var nodesInorder []*api.NodeInfo @@ -141,8 +186,8 @@ func SortNodes(nodeScores map[float64][]*api.NodeInfo) []*api.NodeInfo { return nodesInorder } -// SelectBestNode returns best node whose score is highest, pick one randomly if there are many nodes with same score. -func SelectBestNode(nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo { +// SelectBestNodeAndScore returns the best node whose score is highest and the highest score, pick one randomly if there are many nodes with same score. +func SelectBestNodeAndScore(nodeScores map[float64][]*api.NodeInfo) (*api.NodeInfo, float64) { var bestNodes []*api.NodeInfo maxScore := -1.0 for score, nodes := range nodeScores { @@ -153,10 +198,28 @@ func SelectBestNode(nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo { } if len(bestNodes) == 0 { - return nil + return nil, 0 } - return bestNodes[rand.Intn(len(bestNodes))] + return bestNodes[rand.Intn(len(bestNodes))], maxScore +} + +// SelectBestHyperNode return the best hyperNode name whose score is highest, pick one randomly if there are many hyperNodes with same score. +func SelectBestHyperNode(hyperNodeScores map[float64][]string) string { + var bestHyperNodes []string + maxScore := -1.0 + for score, hyperNodes := range hyperNodeScores { + if score > maxScore { + maxScore = score + bestHyperNodes = hyperNodes + } + } + + if len(bestHyperNodes) == 0 { + return "" + } + + return bestHyperNodes[rand.Intn(len(bestHyperNodes))] } // GetNodeList returns values of the map 'nodes' @@ -170,6 +233,20 @@ func GetNodeList(nodes map[string]*api.NodeInfo, nodeList []string) []*api.NodeI return result } +// GetHyperNodeList returns values of the map 'hyperNodes'. +func GetHyperNodeList(hyperNodes map[string]sets.Set[string], allNodes map[string]*api.NodeInfo) map[string][]*api.NodeInfo { + result := make(map[string][]*api.NodeInfo) + for hyperNodeName, nodes := range hyperNodes { + result[hyperNodeName] = make([]*api.NodeInfo, 0, len(nodes)) + for node := range nodes { + if ni, ok := allNodes[node]; ok { + result[hyperNodeName] = append(result[hyperNodeName], ni) + } + } + } + return result +} + // ValidateVictims returns an error if the resources of the victims can't satisfy the preemptor func ValidateVictims(preemptor *api.TaskInfo, node *api.NodeInfo, victims []*api.TaskInfo) error { // Victims should not be judged to be empty here. diff --git a/pkg/scheduler/util/scheduler_helper_test.go b/pkg/scheduler/util/scheduler_helper_test.go index cf7aedf76a..595a7a0c12 100644 --- a/pkg/scheduler/util/scheduler_helper_test.go +++ b/pkg/scheduler/util/scheduler_helper_test.go @@ -19,7 +19,10 @@ package util import ( "testing" + "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/util/sets" + "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -29,6 +32,7 @@ func TestSelectBestNode(t *testing.T) { NodeScores map[float64][]*api.NodeInfo // Expected node is one of ExpectedNodes ExpectedNodes []*api.NodeInfo + ExpectedScore float64 }{ { NodeScores: map[float64][]*api.NodeInfo{ @@ -36,6 +40,7 @@ func TestSelectBestNode(t *testing.T) { 2.0: {&api.NodeInfo{Name: "node3"}, &api.NodeInfo{Name: "node4"}}, }, ExpectedNodes: []*api.NodeInfo{{Name: "node3"}, {Name: "node4"}}, + ExpectedScore: 2.0, }, { NodeScores: map[float64][]*api.NodeInfo{ @@ -44,6 +49,7 @@ func TestSelectBestNode(t *testing.T) { 2.0: {&api.NodeInfo{Name: "node4"}, &api.NodeInfo{Name: "node5"}}, }, ExpectedNodes: []*api.NodeInfo{{Name: "node3"}}, + ExpectedScore: 3.0, }, { NodeScores: map[float64][]*api.NodeInfo{}, @@ -60,10 +66,13 @@ func TestSelectBestNode(t *testing.T) { return false } for i, test := range cases { - result := SelectBestNode(test.NodeScores) + result, score := SelectBestNodeAndScore(test.NodeScores) if !oneOf(result, test.ExpectedNodes) { t.Errorf("Failed test case #%d, expected: %#v, got %#v", i, test.ExpectedNodes, result) } + if score != test.ExpectedScore { + t.Errorf("Failed test case #%d, expected: %#v, got %#v", i, test.ExpectedScore, score) + } } } @@ -155,3 +164,79 @@ func TestNumFeasibleNodesToFind(t *testing.T) { }) } } + +func TestGetHyperNodeList(t *testing.T) { + testCases := []struct { + name string + hyperNodes map[string]sets.Set[string] + allNodes map[string]*api.NodeInfo + expected map[string][]*api.NodeInfo + }{ + { + name: "Normal case", + hyperNodes: map[string]sets.Set[string]{ + "hyperNode1": sets.New[string]("node1", "node2"), + "hyperNode2": sets.New[string]("node3"), + }, + allNodes: map[string]*api.NodeInfo{ + "node1": {Name: "node1"}, + "node2": {Name: "node2"}, + "node3": {Name: "node3"}, + }, + expected: map[string][]*api.NodeInfo{ + "hyperNode1": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode2": { + {Name: "node3"}, + }, + }, + }, + { + name: "Missing nodes", + hyperNodes: map[string]sets.Set[string]{ + "hyperNode1": sets.New[string]("node1", "node4"), + "hyperNode2": sets.New[string]("node3"), + }, + allNodes: map[string]*api.NodeInfo{ + "node1": {Name: "node1"}, + "node3": {Name: "node3"}, + }, + expected: map[string][]*api.NodeInfo{ + "hyperNode1": { + {Name: "node1"}, + }, + "hyperNode2": { + {Name: "node3"}, + }, + }, + }, + { + name: "Empty hyperNodes", + hyperNodes: map[string]sets.Set[string]{}, + allNodes: map[string]*api.NodeInfo{ + "node1": {Name: "node1"}, + "node2": {Name: "node2"}, + }, + expected: map[string][]*api.NodeInfo{}, + }, + { + name: "Empty allNodes", + hyperNodes: map[string]sets.Set[string]{ + "hyperNode1": sets.New[string]("node1", "node2"), + }, + allNodes: map[string]*api.NodeInfo{}, + expected: map[string][]*api.NodeInfo{ + "hyperNode1": {}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := GetHyperNodeList(tc.hyperNodes, tc.allNodes) + assert.Equal(t, tc.expected, result) + }) + } +} diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 9adfc01d1e..8753feb8e4 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -215,6 +215,16 @@ func BuildPodGroup(name, ns, queue string, minMember int32, taskMinMember map[st } } +// BuildPodGroupWithNetWorkTopologies builds podGroup with NetWorkTopologies. +func BuildPodGroupWithNetWorkTopologies(name, ns, queue string, minMember int32, taskMinMember map[string]int32, status schedulingv1beta1.PodGroupPhase, mode string, highestTierAllowed int) *schedulingv1beta1.PodGroup { + pg := BuildPodGroup(name, ns, queue, minMember, taskMinMember, status) + pg.Spec.NetworkTopology = &schedulingv1beta1.NetworkTopologySpec{ + Mode: schedulingv1beta1.NetworkTopologyMode(mode), + HighestTierAllowed: &highestTierAllowed, + } + return pg +} + // BuildPodGroupWithMinResources return podgroup with base spec and phase status and minResources func BuildPodGroupWithMinResources(name, ns, queue string, minMember int32, taskMinMember map[string]int32, minResources v1.ResourceList, status schedulingv1beta1.PodGroupPhase) *schedulingv1beta1.PodGroup { return &schedulingv1beta1.PodGroup{