Skip to content

Commit

Permalink
Add Storage to workload creation
Browse files Browse the repository at this point in the history
  • Loading branch information
PBundyra committed Oct 8, 2024
1 parent ccfb37f commit e7cbbde
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 14 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/build_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ jobs:
with:
version: '>= 363.0.0'
install_components: 'beta,gke-gcloud-auth-plugin'
- name: Generate random seed
run: |
RANDOM_SEED=$((RANDOM % 10000)) # Generate a random number between 0 and 9999
echo "RANDOM_SEED=$RANDOM_SEED" >> $GITHUB_ENV
- name: Verify gcp setup
run: gcloud info
- name: Set Google Cloud CLI properties to a unused zone to verify --zone arg is passed properly in commands.
Expand Down Expand Up @@ -73,6 +77,7 @@ jobs:
echo -e \
'#!/bin/bash \n
echo "Hello world from a test script!"
cd ~/../test-mount-point && echo "Hello world from a Github Action CI/CD test script!" > '$RANDOM_SEED'.txt' \
> test.sh
- name: Run a base-docker-image workload
run: |
Expand All @@ -87,6 +92,10 @@ jobs:
python3 xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME \
--docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b \
--command "echo \"Hello world from a test script! \""
- name: Verify if the file was created in the GCS bucket
run: gsutil cp gs://xpk-ci-cd-tests/$RANDOM_SEED.txt .
- name: Check if the file contains desired content
run: grep 'Hello world from a Github Action CI/CD test script!' $RANDOM_SEED.txt
- name: Wait for Pathways workload completion and confirm it succeeded
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300
- name: List out the workloads on the cluster
Expand All @@ -95,6 +104,8 @@ jobs:
run: python3 xpk.py workload delete --workload $WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
- name: Delete the Pathways workload on the cluster
run: python3 xpk.py workload delete --workload $PATHWAYS_WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
- name: Delete created GCS file
run: gsutil rm gs://xpk-ci-cd-tests/$RANDOM_SEED.txt
- name: Delete existing Storage
run: python3 xpk.py storage delete $STORAGE_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
- name: Delete the cluster created
Expand Down
56 changes: 55 additions & 1 deletion src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
)
from ..core.core import (
CLUSTER_METADATA_CONFIGMAP,
GCS_FUSE_ANNOTATION,
VERTEX_TENSORBOARD_FEATURE_FLAG,
AcceleratorTypeToAcceleratorCharacteristics,
add_zone_and_project,
check_if_workload_can_schedule,
check_if_workload_exists,
create_accelerator_label,
create_k8s_service_account,
create_machine_label,
create_vertex_experiment,
get_cluster_configmap,
Expand All @@ -42,6 +44,7 @@
get_volumes,
is_cluster_using_clouddns,
parse_env_config,
setup_k8s_env,
wait_for_job_completion,
xpk_current_version,
zone_to_region,
Expand All @@ -59,6 +62,16 @@
get_pathways_worker_args,
get_user_workload_for_pathways,
)
from ..core.storage import (
XPK_SA,
Storage,
add_bucket_iam_members,
get_storage_volume_mounts_yaml,
get_storage_volume_mounts_yaml_for_gpu,
get_storage_volumes_yaml,
get_storage_volumes_yaml_for_gpu,
get_storages,
)
from ..core.system_characteristics import (
AcceleratorType,
get_system_characteristics,
Expand All @@ -74,6 +87,7 @@
xpk.google.com/workload: {args.workload}
annotations:
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment
{gcs_fuse_annotation}
spec:
failurePolicy:
maxRestarts: {args.max_restarts}
Expand All @@ -89,6 +103,8 @@
metadata:
labels:
xpk.google.com/workload: {args.workload}
annotations:
{gcs_fuse_annotation}
spec:
schedulerName: {args.scheduler}
restartPolicy: Never
Expand All @@ -103,6 +119,7 @@
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
containers:
{container}
serviceAccountName: {service_account}
volumes:
{volumes}
"""
Expand All @@ -122,6 +139,9 @@
- name: slice-job
replicas: 1
template:
metadata:
annotations:
{gcs_fuse_annotation}
spec:
parallelism: {args.num_nodes}
completions: {args.num_nodes}
Expand All @@ -137,11 +157,13 @@
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
serviceAccountName: {service_account}
tolerations:
- operator: "Exists"
key: nvidia.com/gpu
volumes:
{gpu_volume}
{storage_volumes}
containers:
{gpu_rxdm_image}
imagePullPolicy: Always
Expand All @@ -155,6 +177,7 @@
privileged: true
volumeMounts:
{gpu_tcp_volume}
{storage_volume_mounts}
- name: nvidia-install-dir-host
mountPath: /usr/local/nvidia/lib64
- name: workload-terminated-volume
Expand Down Expand Up @@ -193,8 +216,12 @@
completions: {system.vms_per_slice}
parallelism: {system.vms_per_slice}
template:
metadata:
annotations:
{gcs_fuse_annotation}
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
serviceAccountName: {service_account}
containers:
- args:
{pathways_worker_args}
Expand All @@ -213,6 +240,7 @@
volumeMounts:
- mountPath: /tmp
name: shared-tmp
{storage_volume_mounts}
nodeSelector:
{accelerator_label}
{machine_label}
Expand All @@ -223,6 +251,7 @@
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
{storage_volumes}
- name: rm
replicas: 1
template:
Expand Down Expand Up @@ -324,6 +353,9 @@ def workload_create(args) -> None:
Returns:
0 if successful and 1 otherwise.
"""
k8s_api_client = setup_k8s_env(args)
create_k8s_service_account(XPK_SA, 'default')

if args.headless and not is_cluster_using_clouddns(args):
xpk_print(
'Please run xpk cluster create-pathways first, to upgrade and enable'
Expand Down Expand Up @@ -402,6 +434,16 @@ def workload_create(args) -> None:
if return_code != 0:
xpk_exit(return_code)

storages: list[Storage] = get_storages(k8s_api_client, args.storage)
gcs_fuse_annotation = ''
service_account = ''
if len(storages) > 0:
gcs_fuse_annotation = GCS_FUSE_ANNOTATION
service_account = XPK_SA
xpk_print(f'Detected Storages to add: {storages}')
else:
xpk_print('No Storages to add detected')

# Create the workload file based on accelerator type or workload type.
if system.accelerator_type == AcceleratorType['GPU']:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -423,6 +465,10 @@ def workload_create(args) -> None:
gpu_rxdm_image=get_gpu_rxdm_image(system),
gpu_rxdm_cmd=get_gpu_rxdm_cmd(system),
gpu_tcp_volume=get_gpu_tcp_volume(system),
storage_volumes=get_storage_volumes_yaml_for_gpu(storages),
storage_volume_mounts=get_storage_volume_mounts_yaml_for_gpu(storages),
gcs_fuse_annotation=gcs_fuse_annotation,
service_account=XPK_SA,
)
elif args.use_pathways and ensure_pathways_workload_prerequisites(
args, system
Expand All @@ -437,13 +483,17 @@ def workload_create(args) -> None:
pathways_rm_args=get_pathways_rm_args(args, system),
pathways_worker_args=get_pathways_worker_args(args),
pathways_proxy_args=get_pathways_proxy_args(args),
user_workload=get_user_workload_for_pathways(args, system),
user_workload=get_user_workload_for_pathways(args, system, storages),
resource_type=AcceleratorTypeToAcceleratorCharacteristics[
system.accelerator_type
].resource_type,
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
backoff_limit=system.vms_per_slice * 4,
gcs_fuse_annotation=gcs_fuse_annotation,
storage_volumes=get_storage_volumes_yaml(storages),
storage_volume_mounts=get_storage_volume_mounts_yaml(storages),
service_account=XPK_SA,
)
else:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -461,6 +511,8 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
volumes=get_volumes(args, system),
gcs_fuse_annotation=gcs_fuse_annotation,
service_account=service_account,
)
tmp = write_tmp_file(yml_string)
command = f'kubectl apply -f {str(tmp.file.name)}'
Expand All @@ -470,6 +522,8 @@ def workload_create(args) -> None:
xpk_print(f'Create Workload request returned ERROR {return_code}')
xpk_exit(return_code)

add_bucket_iam_members(args, storages)

# Get GKE outlier dashboard for TPU
outlier_dashboard_id = None
if system.accelerator_type == AcceleratorType['TPU']:
Expand Down
55 changes: 45 additions & 10 deletions src/xpk/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from google.cloud import resourcemanager_v3
from kubernetes import client as k8s_client
from kubernetes import config
from kubernetes.client.exceptions import ApiException

from ..utils import get_user_input, write_tmp_file, xpk_exit, xpk_print
from .commands import (
Expand All @@ -54,6 +55,7 @@
run_command_with_updates_retry,
run_commands,
)
from .storage import Storage, get_storages
from .system_characteristics import (
AcceleratorType,
AcceleratorTypeToAcceleratorCharacteristics,
Expand Down Expand Up @@ -345,6 +347,20 @@ def setup_k8s_env(args: Namespace) -> k8s_client.ApiClient:
return k8s_client.ApiClient()


def create_k8s_service_account(name: str, namespace: str) -> None:
k8s_core_client = k8s_client.CoreV1Api()
sa = k8s_client.V1ServiceAccount(metadata=k8s_client.V1ObjectMeta(name=name))

xpk_print(f'Creating a new service account: {name}')
try:
k8s_core_client.create_namespaced_service_account(
namespace, sa, pretty=True
)
xpk_print(f'Created a new service account: {sa} successfully')
except ApiException:
xpk_print(f'Service account: {name} already exists. Skipping its creation')


def get_total_chips_requested_from_args(
args, system: SystemCharacteristics
) -> int:
Expand Down Expand Up @@ -2561,16 +2577,25 @@ def get_volumes(args, system: SystemCharacteristics) -> str:
"""
volumes = """- emptyDir:
medium: Memory
name: dshm-2"""
name: dshm-2
"""

if (
system.accelerator_type == AcceleratorType['TPU']
and args.deploy_stacktrace_sidecar
):
volumes += """
- name: tpu-stack-trace
- name: shared-data"""
- name: shared-data
"""

storages: list[Storage] = get_storages(setup_k8s_env(args), args.storage)
for storage in storages:
volumes += f"""- name: {storage.pv}
persistentVolumeClaim:
claimName: {storage.pvc}
readOnly: {storage.readonly}
"""
return volumes


Expand All @@ -2584,20 +2609,22 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str:
YAML for the volumes mounted within a Pathways container or GPU container as a YAML string.
"""
volume_mount_yaml = """- mountPath: /dev/shm
name: dshm-2"""
name: dshm-2
"""

if args.use_pathways:
volume_mount_yaml = """- mountPath: /tmp
name: shared-tmp"""
name: shared-tmp
"""
elif (
system.accelerator_type == AcceleratorType['TPU']
and args.deploy_stacktrace_sidecar
):
volume_mount_yaml += """
- name: tpu-stack-trace
volume_mount_yaml += """- name: tpu-stack-trace
mountPath: /tmp/debugging
- name: shared-data
mountPath: /shared-volume"""
mountPath: /shared-volume
"""
elif system.accelerator_type == AcceleratorType['GPU']:
if system.device_type == h100_device_type:
volume_mount_yaml = """- name: nvidia-install-dir-host
Expand All @@ -2609,15 +2636,23 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str:
- name: shared-memory
mountPath: /dev/shm
- name: workload-terminated-volume
mountPath: /usr/share/workload"""
mountPath: /usr/share/workload
"""
elif system.device_type == h100_mega_device_type:
volume_mount_yaml = """- name: nvidia-install-dir-host
mountPath: /usr/local/nvidia/lib64
- name: shared-memory
mountPath: /dev/shm
- name: workload-terminated-volume
mountPath: /usr/share/workload"""

mountPath: /usr/share/workload
"""

storages: list[Storage] = get_storages(setup_k8s_env(args), args.storage)
for storage in storages:
volume_mount_yaml += f"""- name: {storage.pv}
mountPath: {storage.mount_point}
readOnly: {storage.readonly}
"""
return volume_mount_yaml


Expand Down
Loading

0 comments on commit e7cbbde

Please sign in to comment.