Skip to content

Commit

Permalink
Allow TPU Provisioner to be configured to copy Pod labels to Node lab…
Browse files Browse the repository at this point in the history
…els (#788)

* Allow TPU Provisioner to be configured to copy Pod labels to Node labels

* Add annotation to specify which labels to copy to nodes
  • Loading branch information
nstogner authored Sep 23, 2024
1 parent 65c2a98 commit ba10e41
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 31 deletions.
3 changes: 3 additions & 0 deletions tpu-provisioner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func main() {
GCPNodeServiceAccount string `envconfig:"GCP_NODE_SERVICE_ACCOUNT"`

GCPNodeTags []string `envconfig:"GCP_NODE_TAGS"`
GCPPodToNodeLabels []string `envconfig:"GCP_POD_TO_NODE_LABELS"`
GCPNodeSecondaryDisk string `envconfig:"GCP_NODE_SECONDARY_DISK" default:""`
GCPNodeSecureBoot bool `envconfig:"GCP_NODE_SECURE_BOOT" default:"true"`

Expand Down Expand Up @@ -189,6 +190,7 @@ func main() {
"zone", cfg.GCPZone,
"nodeServiceAccount", cfg.GCPNodeServiceAccount,
"nodeTags", cfg.GCPNodeTags,
"podToNodeLabels", cfg.GCPPodToNodeLabels,
)

containers, err := containerv1beta1.NewService(context.Background() /*, option.WithCredentials(creds)*/)
Expand All @@ -206,6 +208,7 @@ func main() {
NodeServiceAccount: cfg.GCPNodeServiceAccount,
NodeSecondaryDisk: cfg.GCPNodeSecondaryDisk,
NodeTags: cfg.GCPNodeTags,
PodToNodeLabels: cfg.GCPPodToNodeLabels,
NodeSecureBoot: cfg.GCPNodeSecureBoot,
ForceOnDemand: cfg.GCPForceOnDemand,
},
Expand Down
3 changes: 3 additions & 0 deletions tpu-provisioner/internal/cloud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (

LabelProvisionerNodepoolID = "provisioner-nodepool-id"

// AnnotationCopyLabels is a comma-separated list of labels to copy from the Pod to the node pool config (Nodes).
AnnotationCopyLabels = "tpu-provisioner.cloud.google.com/copy-labels"

EventNodePoolCreationStarted = "NodePoolCreationStarted"
EventNodePoolCreationSucceeded = "NodePoolCreationSucceeded"
EventNodePoolCreationFailed = "NodePoolCreationFailed"
Expand Down
25 changes: 25 additions & 0 deletions tpu-provisioner/internal/cloud/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@ func (g *GKE) nodePoolForPod(name string, p *corev1.Pod) (*containerv1beta1.Node
LabelJobSetNamespace: p.Namespace,
}

// Copy configured labels from the Pod to the Node.
for _, key := range g.ClusterContext.PodToNodeLabels {
if val, ok := p.Labels[key]; ok {
labels[key] = val
}
}

// Copy labels specified by annotation to the Node.
for _, key := range strings.Split(getAnnotation(p, AnnotationCopyLabels), ",") {
key = strings.TrimSpace(key)
if key == "" {
continue
}
if val, ok := p.Labels[key]; ok {
labels[key] = val
}
}

for labelKey, labelValue := range p.Spec.NodeSelector {
switch labelKey {
case ICIResiliencyLabel:
Expand Down Expand Up @@ -485,3 +503,10 @@ func min(a, b int) int {
}
return b
}

func getAnnotation(p *corev1.Pod, key string) string {
if p.Annotations == nil {
return ""
}
return p.Annotations[key]
}
7 changes: 5 additions & 2 deletions tpu-provisioner/internal/cloud/gke_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ type GKEContext struct {
NodeServiceAccount string
NodeSecondaryDisk string
NodeTags []string
NodeSecureBoot bool
ForceOnDemand bool
// PodToNodeLabels is a list of key=value pairs that will be copied from the Pod
// to the Node.
PodToNodeLabels []string
NodeSecureBoot bool
ForceOnDemand bool
}

func (c GKEContext) ClusterName() string {
Expand Down
135 changes: 106 additions & 29 deletions tpu-provisioner/internal/cloud/gke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ func TestPodToNodePoolName(t *testing.T) {
func TestNodePoolForPod(t *testing.T) {
trueVar := true
tests := []struct {
desc string
gkeContext GKEContext
selector map[string]string
want *containerv1beta1.NodePool
desc string
gkeContext GKEContext
additionalLabels map[string]string
additionalAnnotations map[string]string
selector map[string]string
want *containerv1beta1.NodePool
}{
{
desc: "simple case",
Expand Down Expand Up @@ -449,43 +451,118 @@ func TestNodePoolForPod(t *testing.T) {
UpgradeSettings: &container.UpgradeSettings{MaxSurge: 1},
},
},
{
desc: "labels to copy from pod to node",
gkeContext: GKEContext{
PodToNodeLabels: []string{"should-be-copied"},
},
additionalLabels: map[string]string{
"should-be-copied": "val-a",
"should-not-be-copied": "val-b",
},
want: &containerv1beta1.NodePool{
Config: &container.NodeConfig{
Labels: map[string]string{
"google.com/nodepool-manager": "tpu-provisioner",
"google.com/tpu-provisioner-jobset-name": "jobset-test",
"google.com/tpu-provisioner-jobset-namespace": "default",
"google.com/tpu-provisioner-parent-kind": "job",
"google.com/tpu-provisioner-parent-name": "jobset-test-job-1-0",
"google.com/tpu-provisioner-parent-namespace": "default",
"should-be-copied": "val-a",
},
MachineType: "ct5p-hightpu-4t",
ShieldedInstanceConfig: &container.ShieldedInstanceConfig{EnableIntegrityMonitoring: true},
},
InitialNodeCount: 512,
Locations: []string{""},
Management: &container.NodeManagement{AutoRepair: true, AutoUpgrade: false},
MaxPodsConstraint: &container.MaxPodsConstraint{MaxPodsPerNode: 15},
Name: "test-pool",
PlacementPolicy: &container.PlacementPolicy{TpuTopology: "8x16x16", Type: "COMPACT"},
UpgradeSettings: &container.UpgradeSettings{MaxSurge: 1},
},
},
{
desc: "labels to copy from pod to node by annotation",
additionalLabels: map[string]string{
"copy-me": "val-x",
"dont-copy-me": "val-y",
},
additionalAnnotations: map[string]string{
"tpu-provisioner.cloud.google.com/copy-labels": "copy-me",
},
want: &containerv1beta1.NodePool{
Config: &container.NodeConfig{
Labels: map[string]string{
"google.com/nodepool-manager": "tpu-provisioner",
"google.com/tpu-provisioner-jobset-name": "jobset-test",
"google.com/tpu-provisioner-jobset-namespace": "default",
"google.com/tpu-provisioner-parent-kind": "job",
"google.com/tpu-provisioner-parent-name": "jobset-test-job-1-0",
"google.com/tpu-provisioner-parent-namespace": "default",
"copy-me": "val-x",
},
MachineType: "ct5p-hightpu-4t",
ShieldedInstanceConfig: &container.ShieldedInstanceConfig{EnableIntegrityMonitoring: true},
},
InitialNodeCount: 512,
Locations: []string{""},
Management: &container.NodeManagement{AutoRepair: true, AutoUpgrade: false},
MaxPodsConstraint: &container.MaxPodsConstraint{MaxPodsPerNode: 15},
Name: "test-pool",
PlacementPolicy: &container.PlacementPolicy{TpuTopology: "8x16x16", Type: "COMPACT"},
UpgradeSettings: &container.UpgradeSettings{MaxSurge: 1},
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
gke := &GKE{
ClusterContext: tc.gkeContext,
}

labels := map[string]string{
"batch.kubernetes.io/controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"batch.kubernetes.io/job-name": "jobset-test-job-1-0",
"controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"job-name": "jobset-test-job-1-0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
}
for k, v := range tc.additionalLabels {
labels[k] = v
}

annotations := map[string]string{
"alpha.jobset.sigs.k8s.io/exclusive-topology": "cloud.google.com/gke-nodepool",
"batch.kubernetes.io/job-completion-index": "0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
}
for k, v := range tc.additionalAnnotations {
annotations[k] = v
}

pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"alpha.jobset.sigs.k8s.io/exclusive-topology": "cloud.google.com/gke-nodepool",
"batch.kubernetes.io/job-completion-index": "0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
},
Finalizers: []string{"batch.kubernetes.io/job-tracking"},
Labels: map[string]string{
"batch.kubernetes.io/controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"batch.kubernetes.io/job-name": "jobset-test-job-1-0",
"controller-uid": "8484279a-de52-4ca1-b01e-130fbded30fb",
"job-name": "jobset-test-job-1-0",
"jobset.sigs.k8s.io/job-index": "0",
"jobset.sigs.k8s.io/job-key": "random-key",
"jobset.sigs.k8s.io/jobset-name": "jobset-test",
"jobset.sigs.k8s.io/replicatedjob-name": "job-1",
"jobset.sigs.k8s.io/replicatedjob-replicas": "1",
"jobset.sigs.k8s.io/restart-attempt": "0",
},
Name: "job-test-6gfwq",
Namespace: "default",
Annotations: annotations,
Labels: labels,
Finalizers: []string{"batch.kubernetes.io/job-tracking"},
Name: "job-test-6gfwq",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Expand Down

0 comments on commit ba10e41

Please sign in to comment.