Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow TPU Provisioner to be configured to copy Pod labels to Node labels #788

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tpu-provisioner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func main() {
GCPNodeServiceAccount string `envconfig:"GCP_NODE_SERVICE_ACCOUNT"`

GCPNodeTags []string `envconfig:"GCP_NODE_TAGS"`
GCPPodToNodeLabels []string `envconfig:"GCP_POD_TO_NODE_LABELS"`
GCPNodeSecondaryDisk string `envconfig:"GCP_NODE_SECONDARY_DISK" default:""`
GCPNodeSecureBoot bool `envconfig:"GCP_NODE_SECURE_BOOT" default:"true"`

Expand Down Expand Up @@ -189,6 +190,7 @@ func main() {
"zone", cfg.GCPZone,
"nodeServiceAccount", cfg.GCPNodeServiceAccount,
"nodeTags", cfg.GCPNodeTags,
"podToNodeLabels", cfg.GCPPodToNodeLabels,
)

containers, err := containerv1beta1.NewService(context.Background() /*, option.WithCredentials(creds)*/)
Expand All @@ -206,6 +208,7 @@ func main() {
NodeServiceAccount: cfg.GCPNodeServiceAccount,
NodeSecondaryDisk: cfg.GCPNodeSecondaryDisk,
NodeTags: cfg.GCPNodeTags,
PodToNodeLabels: cfg.GCPPodToNodeLabels,
NodeSecureBoot: cfg.GCPNodeSecureBoot,
ForceOnDemand: cfg.GCPForceOnDemand,
},
Expand Down
3 changes: 3 additions & 0 deletions tpu-provisioner/internal/cloud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (

LabelProvisionerNodepoolID = "provisioner-nodepool-id"

// AnnotationCopyLabels is a comma-separated list of labels to copy from the Pod to the node pool config (Nodes).
AnnotationCopyLabels = "tpu-provisioner.cloud.google.com/copy-labels"

EventNodePoolCreationStarted = "NodePoolCreationStarted"
EventNodePoolCreationSucceeded = "NodePoolCreationSucceeded"
EventNodePoolCreationFailed = "NodePoolCreationFailed"
Expand Down
25 changes: 25 additions & 0 deletions tpu-provisioner/internal/cloud/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@ func (g *GKE) nodePoolForPod(name string, p *corev1.Pod) (*containerv1beta1.Node
LabelJobSetNamespace: p.Namespace,
}

// Copy configured labels from the Pod to the Node.
for _, key := range g.ClusterContext.PodToNodeLabels {
gongmax marked this conversation as resolved.
Show resolved Hide resolved
if val, ok := p.Labels[key]; ok {
labels[key] = val
}
}

// Copy labels specified by annotation to the Node.
for _, key := range strings.Split(getAnnotation(p, AnnotationCopyLabels), ",") {
key = strings.TrimSpace(key)
if key == "" {
continue
}
if val, ok := p.Labels[key]; ok {
labels[key] = val
}
}

for labelKey, labelValue := range p.Spec.NodeSelector {
switch labelKey {
case ICIResiliencyLabel:
Expand Down Expand Up @@ -485,3 +503,10 @@ func min(a, b int) int {
}
return b
}

func getAnnotation(p *corev1.Pod, key string) string {
if p.Annotations == nil {
return ""
}
return p.Annotations[key]
}
7 changes: 5 additions & 2 deletions tpu-provisioner/internal/cloud/gke_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ type GKEContext struct {
NodeServiceAccount string
NodeSecondaryDisk string
NodeTags []string
NodeSecureBoot bool
ForceOnDemand bool
// PodToNodeLabels is a list of key=value pairs that will be copied from the Pod
// to the Node.
PodToNodeLabels []string
NodeSecureBoot bool
ForceOnDemand bool
}

func (c GKEContext) ClusterName() string {
Expand Down
135 changes: 106 additions & 29 deletions tpu-provisioner/internal/cloud/gke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ func TestPodToNodePoolName(t *testing.T) {
func TestNodePoolForPod(t *testing.T) {
trueVar := true
tests := []struct {
desc string
gkeContext GKEContext
selector map[string]string
want *containerv1beta1.NodePool
desc string
gkeContext GKEContext
additionalLabels map[string]string
additionalAnnotations map[string]string
selector map[string]string
want *containerv1beta1.NodePool
}{
{
desc: "simple case",
Expand Down Expand Up @@ -449,43 +451,118 @@ func TestNodePoolForPod(t *testing.T) {
UpgradeSettings: &container.UpgradeSettings{MaxSurge: 1},
},
},
{
desc: "labels to copy from pod to node",
gkeContext: GKEContext{
PodToNodeLabels: []string{"should-be-copied"},
},
additionalLabels: map[string]string{
"should-be-copied": "val-a",
"should-not-be-copied": "val-b",
},
want: &containerv1beta1.NodePool{
Config: &container.NodeConfig{
Labels: map[string]string{
"google.com/nodepool-manager": "tpu-provisioner",
"google.com/tpu-provisioner-jobset-name": "jobset-test",
"google.com/tpu-provisioner-jobset-namespace": "default",
"google.com/tpu-provisioner-parent-kind": "job",
"google.com/tpu-provisioner-parent-name": "jobset-test-job-1-0",
"google.com/tpu-provisioner-parent-namespace": "default",
"should-be-copied": "val-a",
},
MachineType: "ct5p-hightpu-4t",
ShieldedInstanceConfig: &container.ShieldedInstanceConfig{EnableIntegrityMonitoring: true},
},
InitialNodeCount: 512,
Locations: []string{""},
Management: &container.NodeManagement{AutoRepair: true, AutoUpgrade: false},
MaxPodsConstraint: &container.MaxPodsConstraint{MaxPodsPerNode: 15},
Name: "test-pool",
PlacementPolicy: &container.PlacementPolicy{TpuTopology: "8x16x16", Type: "COMPACT"},
UpgradeSettings: &container.UpgradeSettings{MaxSurge: 1},
},
},
{
desc: "labels to copy from pod to node by annotation",
additionalLabels: map[string]string{
"copy-me": "val-x",
"dont-copy-me": "val-y",
},
additionalAnnotations: map[string]string{
"tpu-provisioner.cloud.google.com/copy-labels": "copy-me",
},
want: &containerv1beta1.NodePool{
Config: &container.NodeConfig{
Labels: map[string]string{
"google.com/nodepool-manager": "tpu-provisioner",
"google.com/tpu-provisioner-jobset-name": "jobset-test",
"google.com/tpu-provisioner-jobset-namespace": "default",
"google.com/tpu-provisioner-parent-kind": "job",
"google.com/tpu-provisioner-parent-name": "jobset-test-job-1-0",
"google.com/tpu-provisioner-parent-namespace": "default",
"copy-me": "val-x",
},
MachineType: "ct5p-hightpu-4t",
ShieldedInstanceConfig: &container.ShieldedInstanceConfig{EnableIntegrityMonitoring: true},
},
InitialNodeCount: 512,
Locations: []string{""},
Management: &container.NodeManagement{AutoRepair: true, AutoUpgrade: false},
MaxPodsConstraint: &container.MaxPodsConstraint{MaxPodsPerNode: 15},
Name: "test-pool",
PlacementPolicy: &container.PlacementPolicy{TpuTopology: "8x16x16", Type: "COMPACT"},
UpgradeSettings: &container.UpgradeSettings{MaxSurge: 1},
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
gke := &GKE{
ClusterContext: tc.gkeContext,
}

labels := map[string]string{
"batch.kubernetes.io/controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"batch.kubernetes.io/job-name": "jobset-test-job-1-0",
"controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"job-name": "jobset-test-job-1-0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
}
for k, v := range tc.additionalLabels {
labels[k] = v
}

annotations := map[string]string{
"alpha.jobset.sigs.k8s.io/exclusive-topology": "cloud.google.com/gke-nodepool",
"batch.kubernetes.io/job-completion-index": "0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
}
for k, v := range tc.additionalAnnotations {
annotations[k] = v
}

pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"alpha.jobset.sigs.k8s.io/exclusive-topology": "cloud.google.com/gke-nodepool",
"batch.kubernetes.io/job-completion-index": "0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
},
Finalizers: []string{"batch.kubernetes.io/job-tracking"},
Labels: map[string]string{
"batch.kubernetes.io/controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"batch.kubernetes.io/job-name": "jobset-test-job-1-0",
"controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"job-name": "jobset-test-job-1-0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
},
Name: "job-test-6gfwq",
Namespace: "default",
Annotations: annotations,
Labels: labels,
Finalizers: []string{"batch.kubernetes.io/job-tracking"},
Name: "job-test-6gfwq",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Expand Down