forked from kubeflow/pipelines
-
Notifications
You must be signed in to change notification settings - Fork 2
/
gcp.py
146 lines (122 loc) · 5.84 KB
/
gcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Extension module for KFP on GCP deployment."""
from kubernetes.client import V1Toleration, V1Affinity, V1NodeAffinity, \
V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement, V1PreferredSchedulingTerm
def use_gcp_secret(secret_name='user-gcp-sa',
secret_file_path_in_volume=None,
volume_name=None,
secret_volume_mount_path='/secret/gcp-credentials'):
"""An operator that configures the container to use GCP service account by
service account key stored in a Kubernetes secret.
For cluster setup and alternatives to using service account key, check https://www.kubeflow.org/docs/gke/authentication-pipelines/.
"""
# permitted values for secret_name = ['admin-gcp-sa', 'user-gcp-sa']
if secret_file_path_in_volume is None:
secret_file_path_in_volume = '/' + secret_name + '.json'
if volume_name is None:
volume_name = 'gcp-credentials-' + secret_name
else:
import warnings
warnings.warn(
'The volume_name parameter is deprecated and will be removed in next release. The volume names are now generated automatically.',
DeprecationWarning)
def _use_gcp_secret(task):
from kubernetes import client as k8s_client
task = task.add_volume(
k8s_client.V1Volume(
name=volume_name,
secret=k8s_client.V1SecretVolumeSource(
secret_name=secret_name,)))
task.container \
.add_volume_mount(
k8s_client.V1VolumeMount(
name=volume_name,
mount_path=secret_volume_mount_path,
)
) \
.add_env_variable(
k8s_client.V1EnvVar(
name='GOOGLE_APPLICATION_CREDENTIALS',
value=secret_volume_mount_path + secret_file_path_in_volume,
)
) \
.add_env_variable(
k8s_client.V1EnvVar(
name='CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE',
value=secret_volume_mount_path + secret_file_path_in_volume,
)
) # Set GCloud Credentials by using the env var override.
# TODO: Is there a better way for GCloud to pick up the credential?
return task
return _use_gcp_secret
def use_tpu(tpu_cores: int, tpu_resource: str, tf_version: str):
"""An operator that configures GCP TPU spec in a container op.
Args:
tpu_cores: Required. The number of cores of TPU resource.
For example, the value can be '8', '32', '128', etc.
Check more details at: https://cloud.google.com/tpu/docs/kubernetes-engine-setup#pod-spec.
tpu_resource: Required. The resource name of the TPU resource.
For example, the value can be 'v2', 'preemptible-v1', 'v3' or 'preemptible-v3'.
Check more details at: https://cloud.google.com/tpu/docs/kubernetes-engine-setup#pod-spec.
tf_version: Required. The TensorFlow version that the TPU nodes use.
For example, the value can be '1.12', '1.11', '1.9' or '1.8'.
Check more details at: https://cloud.google.com/tpu/docs/supported-versions.
"""
def _set_tpu_spec(task):
task.add_pod_annotation('tf-version.cloud-tpus.google.com', tf_version)
task.container.add_resource_limit(
'cloud-tpus.google.com/{}'.format(tpu_resource), str(tpu_cores))
return task
return _set_tpu_spec
def use_preemptible_nodepool(toleration: V1Toleration = V1Toleration(
effect='NoSchedule', key='preemptible', operator='Equal', value='true'),
hard_constraint: bool = False):
"""An operator that configures the GKE preemptible in a container op.
Args:
toleration: toleration to pods, default is the preemptible label.
hard_constraint: the constraint of scheduling the pods on preemptible
nodepools is hard. (Default: False)
"""
def _set_preemptible(task):
task.add_toleration(toleration)
node_selector_term = V1NodeSelectorTerm(match_expressions=[
V1NodeSelectorRequirement(
key='cloud.google.com/gke-preemptible',
operator='In',
values=['true'])
])
if hard_constraint:
node_affinity = V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[node_selector_term]))
else:
node_affinity = V1NodeAffinity(
preferred_during_scheduling_ignored_during_execution=[
V1PreferredSchedulingTerm(
preference=node_selector_term, weight=50)
])
affinity = V1Affinity(node_affinity=node_affinity)
task.add_affinity(affinity=affinity)
return task
return _set_preemptible
def add_gpu_toleration(toleration: V1Toleration = V1Toleration(
effect='NoSchedule', key='nvidia.com/gpu', operator='Equal', value='true')):
"""An operator that configures the GKE GPU nodes in a container op.
Args:
toleration: toleration to pods, default is the nvidia.com/gpu label.
"""
def _set_toleration(task):
task.add_toleration(toleration)
return _set_toleration