From 73af01e4782ec12fdf53133c2a910b869c5c8081 Mon Sep 17 00:00:00 2001 From: Patryk Bundyra Date: Mon, 21 Oct 2024 11:27:57 +0200 Subject: [PATCH] Introduce Storage API (#192) * Introduce Storage API * Fix Gh CI/CD * Improve getting storages to mount to workload * Improve handling exception while deleting storage * Fix adding necessary args and cluster credentials * Fix storage class, differentiate storage types in create and delete commands, add --type validation, reduce number of CRD installations --- .github/workflows/build_tests.yaml | 32 ++- pyproject.toml | 9 +- src/xpk/api/__init__.py | 15 ++ src/xpk/api/storage_crd.yaml | 52 +++++ src/xpk/commands/cluster.py | 52 +---- src/xpk/commands/inspector.py | 6 +- src/xpk/commands/storage.py | 110 +++++++++ src/xpk/commands/workload.py | 15 +- src/xpk/core/core.py | 82 ++++++- src/xpk/core/nap.py | 6 +- src/xpk/core/storage.py | 363 +++++++++++++++++++++++++++++ src/xpk/parser/common.py | 7 +- src/xpk/parser/core.py | 11 +- src/xpk/parser/inspector.py | 2 +- src/xpk/parser/storage.py | 122 ++++++++++ src/xpk/templates/__init__.py | 15 ++ src/xpk/templates/pod.yaml | 0 src/xpk/templates/storage.yaml | 13 ++ src/xpk/utils.py | 57 ++++- tests/data/pv-pvc-templates.yaml | 31 +++ 20 files changed, 916 insertions(+), 84 deletions(-) create mode 100644 src/xpk/api/__init__.py create mode 100644 src/xpk/api/storage_crd.yaml create mode 100644 src/xpk/commands/storage.py create mode 100644 src/xpk/core/storage.py create mode 100644 src/xpk/parser/storage.py create mode 100644 src/xpk/templates/__init__.py create mode 100644 src/xpk/templates/pod.yaml create mode 100644 src/xpk/templates/storage.yaml create mode 100644 tests/data/pv-pvc-templates.yaml diff --git a/.github/workflows/build_tests.yaml b/.github/workflows/build_tests.yaml index 499d91c8..958f7fd3 100644 --- a/.github/workflows/build_tests.yaml +++ b/.github/workflows/build_tests.yaml @@ -24,6 +24,7 @@ env: TPU_CLUSTER_NAME: build-xpk-2-v4-8-nodepools WORKLOAD_NAME: xpktest-build-${{ github.run_attempt }} PATHWAYS_WORKLOAD_NAME: xpkpw-build-${{ github.run_attempt }} + STORAGE_NAME: test-storage jobs: cluster-create-and-delete: @@ -54,19 +55,38 @@ jobs: pip install . xpk --help - name: Create a Pathways-enabled XPK Cluster with 2x v4-8 nodepools. Larger num-nodes to avoid master resizing. - run: python xpk.py cluster create-pathways --cluster $TPU_CLUSTER_NAME --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --default-pool-cpu-machine-type=n1-standard-16 --default-pool-cpu-num-nodes=16 --reservation='${{ secrets.GCP_TPU_V4_RESERVATION }}' + run: | + python3 xpk.py cluster create-pathways --cluster $TPU_CLUSTER_NAME --tpu-type=v4-8 --num-slices=2 \ + --zone=us-central2-b --default-pool-cpu-machine-type=n1-standard-16 --default-pool-cpu-num-nodes=16 \ + --reservation='${{ secrets.GCP_TPU_V4_RESERVATION }}' --enable-workload-identity --enable-gcsfuse-csi-driver - name: Authenticate Docker run: gcloud auth configure-docker --quiet + - name: Create auto-mount Storage instance + run: | + python3 xpk.py storage create $STORAGE_NAME --cluster=$TPU_CLUSTER_NAME --zone=us-central2-b --type=gcsfuse \ + --auto-mount=true \ + --mount-point='/test-mount-point' --readonly=false --manifest='./tests/data/pv-pvc-templates.yaml' + - name: List and verify existing Storages + run: python3 xpk.py storage list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b | tee output.txt | grep 'test-storage' || (echo 'No storage found' && cat output.txt && exit 1) - name: Create test script to execute in workloads - run: echo -e '#!/bin/bash \n echo "Hello world from a test script!"' > test.sh + run: | + echo -e \ + '#!/bin/bash \n + echo "Hello world from a test script!"' \ + > test.sh - name: Run a base-docker-image workload - run: python xpk.py workload create --cluster $TPU_CLUSTER_NAME --workload $WORKLOAD_NAME --command "bash test.sh" --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b + run: | + python3 xpk.py workload create --cluster $TPU_CLUSTER_NAME --workload $WORKLOAD_NAME --command "bash test.sh" \ + --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b - name: Run xpk inspector with the workload created above run: python3 xpk.py inspector --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --workload $WORKLOAD_NAME - name: Wait for workload completion and confirm it succeeded run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $WORKLOAD_NAME --timeout 300 - name: Run a Pathways workload on Ubuntu base image - run: python xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "echo \"Hello world from a test script! \"" + run: | + python3 xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME \ + --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b \ + --command "echo \"Hello world from a test script! \"" - name: Wait for Pathways workload completion and confirm it succeeded run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300 - name: List out the workloads on the cluster @@ -75,9 +95,11 @@ jobs: run: python3 xpk.py workload delete --workload $WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b - name: Delete the Pathways workload on the cluster run: python3 xpk.py workload delete --workload $PATHWAYS_WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b + - name: Delete existing Storage + run: python3 xpk.py storage delete $STORAGE_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b - name: Delete the cluster created if: always() - run: python xpk.py cluster delete --cluster $TPU_CLUSTER_NAME --zone=us-central2-b + run: python3 xpk.py cluster delete --cluster $TPU_CLUSTER_NAME --zone=us-central2-b diff --git a/pyproject.toml b/pyproject.toml index 923473ba..ca1b5123 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,11 @@ keywords = [] # pip dependencies installed with `pip install -e .` dependencies = [ - "cloud-accelerator-diagnostics" + "cloud-accelerator-diagnostics", + "kubernetes", + "google-cloud", + "google-api-core", + "tabulate", ] [project.urls] @@ -57,8 +61,9 @@ dev = [ version = {attr = "xpk.core.core.__version__"} [tool.setuptools] -packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands"] +packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.api"] package-dir = {"" = "src"} +package-data = {"xpk.api" = ["storage_crd.yaml"]} [tool.pyink] # Formatting configuration to follow Google style-guide. diff --git a/src/xpk/api/__init__.py b/src/xpk/api/__init__.py new file mode 100644 index 00000000..e7c0b714 --- /dev/null +++ b/src/xpk/api/__init__.py @@ -0,0 +1,15 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/src/xpk/api/storage_crd.yaml b/src/xpk/api/storage_crd.yaml new file mode 100644 index 00000000..03c47f55 --- /dev/null +++ b/src/xpk/api/storage_crd.yaml @@ -0,0 +1,52 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: storages.xpk.x-k8s.io +spec: + group: xpk.x-k8s.io + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + type: + type: string + cluster: + type: string + auto_mount: + type: boolean + mount_point: + type: string + readonly: + type: boolean + manifest: + type: string + pv: + type: string + pvc: + type: string + required: + - type + - cluster + - auto_mount + - mount_point + - readonly + - manifest + - pvc + - pv + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + scope: Cluster + names: + plural: storages + singular: storage + kind: Storage + shortNames: + - stg \ No newline at end of file diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 35c54a93..c580344a 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -14,11 +14,7 @@ limitations under the License. """ -from ..core.commands import ( - run_command_for_value, - run_command_with_updates, - run_command_with_updates_retry, -) +from ..core.commands import run_command_for_value, run_command_with_updates from ..core.core import ( VERTEX_TENSORBOARD_FEATURE_FLAG, add_zone_and_project, @@ -27,6 +23,7 @@ create_vertex_tensorboard, delete_cluster_subnets, get_all_clusters_programmatic, + get_cluster_credentials, get_gke_control_plane_version, get_gke_node_pool_version, get_gke_server_config, @@ -35,9 +32,10 @@ run_gke_node_pool_create_command, set_jobset_on_cluster, set_up_cluster_network_for_gpu, + setup_k8s_env, update_cluster_with_clouddns_if_necessary, - update_cluster_with_workload_identity_if_necessary, update_cluster_with_gcsfuse_driver_if_necessary, + update_cluster_with_workload_identity_if_necessary, zone_to_region, ) from ..core.kueue import ( @@ -46,6 +44,7 @@ install_kueue_on_cluster, ) from ..core.nap import enable_autoprovisioning_on_cluster +from ..core.storage import install_storage_crd from ..core.system_characteristics import ( AcceleratorType, AcceleratorTypeToAcceleratorCharacteristics, @@ -113,9 +112,7 @@ def cluster_create(args) -> None: if update_cluster_command_code != 0: xpk_exit(update_cluster_command_code) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) # create Vertex Tensorboard for new and existing clusters if create-vertex-tensorboard is set tensorboard_config = {} @@ -164,6 +161,8 @@ def cluster_create(args) -> None: if install_kueue_on_cluster_code != 0: xpk_exit(install_kueue_on_cluster_code) + k8s_client = setup_k8s_env(args) + install_storage_crd(k8s_client) # Provision node pools dynamically based on incoming workloads: # Currently autoprovisioning is not supported with Pathways. autoprovisioning_config = None @@ -236,9 +235,7 @@ def cluster_cacheimage(args) -> None: ) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) system, return_code = get_system_characteristics(args) if return_code > 0: @@ -287,9 +284,7 @@ def cluster_describe(args) -> None: xpk_print(f'Starting nodepool list for cluster: {args.cluster}', flush=True) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) command = ( f'gcloud container node-pools list --cluster {args.cluster} ' @@ -470,8 +465,8 @@ def run_gke_cluster_create_command( ' --enable-autoscaling' ' --total-min-nodes 1 --total-max-nodes 1000' f' --num-nodes {args.default_pool_cpu_num_nodes}' - f' {args.custom_cluster_arguments}' ' --release-channel rapid' + f' {args.custom_cluster_arguments}' ) if system.accelerator_type == AcceleratorType['GPU']: @@ -502,28 +497,3 @@ def run_gke_cluster_create_command( xpk_print(f'GKE Cluster Create request returned ERROR {return_code}') return 1 return 0 - - -def set_cluster_command(args) -> int: - """Run cluster configuration command to set the kubectl config. - - Args: - args: user provided arguments for running the command. - - Returns: - 0 if successful and 1 otherwise. - """ - command = ( - 'gcloud container clusters get-credentials' - f' {args.cluster} --region={zone_to_region(args.zone)}' - f' --project={args.project} &&' - ' kubectl config view && kubectl config set-context --current' - ' --namespace=default' - ) - task = f'get-credentials to cluster {args.cluster}' - return_code = run_command_with_updates_retry( - command, task, args, verbose=False - ) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code diff --git a/src/xpk/commands/inspector.py b/src/xpk/commands/inspector.py index 8fbba5fc..28717844 100644 --- a/src/xpk/commands/inspector.py +++ b/src/xpk/commands/inspector.py @@ -19,11 +19,11 @@ CLUSTER_METADATA_CONFIGMAP, CLUSTER_RESOURCES_CONFIGMAP, add_zone_and_project, + get_cluster_credentials, zone_to_region, ) from ..core.kueue import CLUSTER_QUEUE_NAME, LOCAL_QUEUE_NAME from ..utils import append_tmp_file, write_tmp_file, xpk_exit, xpk_print -from .cluster import set_cluster_command from .workload import get_workload_list @@ -124,9 +124,7 @@ def inspector(args) -> None: xpk_print(args) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) inspector_file = write_tmp_file( '==================\nXPK inspector OUTPUT:\n==================\n' diff --git a/src/xpk/commands/storage.py b/src/xpk/commands/storage.py new file mode 100644 index 00000000..0dcdf0b0 --- /dev/null +++ b/src/xpk/commands/storage.py @@ -0,0 +1,110 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from argparse import Namespace + +from kubernetes import client as k8s_client +from kubernetes.client.rest import ApiException + +from ..core.core import ( + setup_k8s_env, + update_cluster_with_gcsfuse_driver_if_necessary, + update_cluster_with_workload_identity_if_necessary, +) +from ..core.storage import ( + GCS_FUSE_TYPE, + STORAGE_CRD_KIND, + XPK_API_GROUP_NAME, + XPK_API_GROUP_VERSION, + create_storage_instance, + get_storage, + list_storages, + print_storages_for_cluster, +) +from ..utils import apply_kubectl_manifest, xpk_exit, xpk_print + + +def storage_create(args: Namespace) -> None: + k8s_api_client = setup_k8s_env(args) + create_storage_instance(k8s_api_client, args) + if args.type == GCS_FUSE_TYPE: + return_code = update_cluster_with_workload_identity_if_necessary(args) + if return_code > 0: + xpk_exit(return_code) + return_code = update_cluster_with_gcsfuse_driver_if_necessary(args) + if return_code > 0: + xpk_exit(return_code) + apply_kubectl_manifest(k8s_api_client, args.manifest) + + +def storage_list(args: Namespace) -> None: + k8s_api_client = setup_k8s_env(args) + storages = list_storages(k8s_api_client) + print_storages_for_cluster(storages) + + +def delete_resource(api_call, resource_name: str, resource_kind: str) -> None: + """ + Deletes a Kubernetes resource and handles potential API exceptions. + + Args: + api_call: The function to call for deleting the resource. + resource_name: The name of the resource to delete. + resource_type: The type of the resource (e.g., "Persistent Volume Claim"). + """ + xpk_print(f"Deleting {resource_kind}:{resource_name}") + try: + api_call(resource_name) + except ApiException as e: + if e.status == 404: + xpk_print( + f"{resource_kind}: {resource_name} not found. " + f"Might be already deleted. Error: {e}" + ) + return + else: + xpk_print(f"Encountered error during {resource_kind} deletion: {e}") + xpk_exit(1) + xpk_print(f"Deleted {resource_kind}:{resource_name}") + + +def storage_delete(args: Namespace) -> None: + k8s_api_client = setup_k8s_env(args) + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + core_api = k8s_client.CoreV1Api() + storage = get_storage(k8s_api_client, args.name) + if storage.type == GCS_FUSE_TYPE: + delete_resource( + lambda name: core_api.delete_namespaced_persistent_volume_claim( + name, "default" + ), + storage.pvc, + "Persistent Volume Claim", + ) + delete_resource( + core_api.delete_persistent_volume, storage.pv, "Persistent Volume" + ) + + delete_resource( + lambda name: api_instance.delete_cluster_custom_object( + name=name, + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + ), + args.name, + "Storage", + ) diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 3c6c6563..ce2df52f 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -30,6 +30,7 @@ create_machine_label, create_vertex_experiment, get_cluster_configmap, + get_cluster_credentials, get_cpu_affinity, get_gke_outlier_dashboard, get_gpu_rxdm_cmd, @@ -63,7 +64,6 @@ get_system_characteristics, ) from ..utils import get_user_input, write_tmp_file, xpk_exit, xpk_print -from .cluster import set_cluster_command workload_create_yaml = """apiVersion: jobset.x-k8s.io/v1alpha2 kind: JobSet @@ -325,6 +325,7 @@ def workload_create(args) -> None: 0 if successful and 1 otherwise. """ add_zone_and_project(args) + get_cluster_credentials(args) if args.headless and not is_cluster_using_clouddns(args): xpk_print( @@ -333,10 +334,6 @@ def workload_create(args) -> None: ) xpk_exit(1) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) - workload_exists = check_if_workload_exists(args) if workload_exists: @@ -539,9 +536,7 @@ def workload_delete(args) -> None: """ xpk_print('Starting Workload delete', flush=True) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) will_delete = True if not args.workload: @@ -607,9 +602,7 @@ def workload_list(args) -> None: xpk_print('Starting workload list', flush=True) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) if args.wait_for_job_completion: return_code = wait_for_job_completion(args) diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index ee80de3b..4f6e4dbc 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -39,8 +39,14 @@ import string import subprocess import sys +from argparse import Namespace from dataclasses import dataclass +from google.api_core.exceptions import PermissionDenied +from google.cloud import resourcemanager_v3 +from kubernetes import client as k8s_client +from kubernetes import config + from ..utils import get_user_input, write_tmp_file, xpk_exit, xpk_print from .commands import ( run_command_for_value, @@ -76,6 +82,7 @@ AUTOPROVISIONING_CONFIG_VALUE = 'AUTOPROVISION' AUTOPROVISIONING_CONFIG_MINIMUM_KEY = 'minimum_chips' AUTOPROVISIONING_CONFIG_MAXIMUM_KEY = 'maximum_chips' +GCS_FUSE_ANNOTATION = 'gke-gcsfuse/volumes: "true"' class CapacityType(enum.Enum): @@ -194,7 +201,7 @@ def add_zone_and_project(args): args.project = get_project() if not args.zone: args.zone = get_zone() - xpk_print(f'Working on {args.project=} and {args.zone}') + xpk_print(f'Working on {args.project} and {args.zone}') def parse_env_config(args, tensorboard_config, system: SystemCharacteristics): @@ -279,6 +286,23 @@ def get_project(): ] # The project name lives on the last line of the output +def project_id_to_project_number(project_id: str) -> str: + client = resourcemanager_v3.ProjectsClient() + request = resourcemanager_v3.GetProjectRequest() + request.name = f'projects/{project_id}' + try: + response: resourcemanager_v3.Project = client.get_project(request=request) + except PermissionDenied as e: + xpk_print( + f"Couldn't translate project id: {project_id} to project number." + f' Error: {e}' + ) + xpk_exit(1) + parts = response.name.split('/', 1) + xpk_print(f'Project number for project: {project_id} is {parts[1]}') + return parts[1] + + def get_zone(): """Get GCE zone from `gcloud config get compute/zone`. @@ -312,6 +336,15 @@ def zone_to_region(zone) -> str: return zone_terms[0] + '-' + zone_terms[1] +def setup_k8s_env(args: Namespace) -> k8s_client.ApiClient: + add_zone_and_project(args) + get_cluster_credentials(args) + args.project_number = project_id_to_project_number(args.project) + + config.load_kube_config() + return k8s_client.ApiClient() + + def get_total_chips_requested_from_args( args, system: SystemCharacteristics ) -> int: @@ -960,8 +993,8 @@ def get_cluster_configmap(args, configmap_name) -> dict[str, str] | None: return_value = return_value[return_value.index('map') :] configs = return_value[4:-1].split(' ') - for config in configs: - key, value = config.strip().split(':') + for pair in configs: + key, value = pair.strip().split(':') config_map[key] = value return config_map @@ -1082,7 +1115,7 @@ def is_cluster_using_clouddns(args) -> bool: command = ( f'gcloud container clusters describe {args.cluster}' f' --project={args.project} --region={zone_to_region(args.zone)}' - ' | grep "clusterDns: CLOUD_DNS" | wc -l' + ' 2> /dev/null | grep "clusterDns: CLOUD_DNS" | wc -l' ) return_code, cloud_dns_matches = run_command_for_value( command, @@ -1980,8 +2013,20 @@ def get_gke_node_pool_version( if args.gke_version is not None: node_pool_gke_version = args.gke_version else: - node_pool_gke_version = current_gke_master_version.strip() - + master_gke_version = current_gke_master_version.strip() + node_pool_gke_version = '' + # Select minimum version which is >= master_gke_version and has the same minor version. + # If this does not exist select maximum version which is < master_gke_version. + for version in gke_server_config.valid_versions: + if ( + (node_pool_gke_version == '' or node_pool_gke_version < version) + and version < master_gke_version + ) or ( + (node_pool_gke_version == '' or node_pool_gke_version > version) + and master_gke_version <= version + and master_gke_version.split('.')[:2] == version.split('.')[:2] + ): + node_pool_gke_version = version is_supported_node_pool_version = ( node_pool_gke_version in gke_server_config.valid_versions ) @@ -1999,6 +2044,31 @@ def get_gke_node_pool_version( return 0, node_pool_gke_version +def get_cluster_credentials(args: Namespace) -> None: + """Run cluster configuration command to set the kubectl config. + + Args: + args: user provided arguments for running the command. + + Returns: + 0 if successful and 1 otherwise. + """ + command = ( + 'gcloud container clusters get-credentials' + f' {args.cluster} --region={zone_to_region(args.zone)}' + f' --project={args.project} &&' + ' kubectl config view && kubectl config set-context --current' + ' --namespace=default' + ) + task = f'get-credentials to cluster {args.cluster}' + return_code = run_command_with_updates_retry( + command, task, args, verbose=False + ) + if return_code != 0: + xpk_print(f'{task} returned ERROR {return_code}') + xpk_exit(return_code) + + def validate_docker_image(docker_image, args) -> int: """Validates that the user provided docker image exists in your project. diff --git a/src/xpk/core/nap.py b/src/xpk/core/nap.py index b6021e96..bcd06a48 100644 --- a/src/xpk/core/nap.py +++ b/src/xpk/core/nap.py @@ -264,13 +264,9 @@ def is_autoprovisioning_enabled( return False, 0 return_code, autoprovisioning_value = get_value_from_map( - system.gke_accelerator, cluster_config_map + system.gke_accelerator, cluster_config_map, verbose=False ) if return_code != 0: - xpk_print( - 'gke_accelerator type not found in config map:' - f' {resources_configmap_name}. Autoprovisioning is not enabled.' - ) return False, 0 if autoprovisioning_value == AUTOPROVISIONING_CONFIG_VALUE: diff --git a/src/xpk/core/storage.py b/src/xpk/core/storage.py new file mode 100644 index 00000000..381da653 --- /dev/null +++ b/src/xpk/core/storage.py @@ -0,0 +1,363 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +from argparse import Namespace +from dataclasses import dataclass + +import yaml +from kubernetes import client as k8s_client +from kubernetes import utils +from kubernetes.client import ApiClient +from kubernetes.client.exceptions import ApiException +from kubernetes.client.models.v1_persistent_volume import V1PersistentVolume +from kubernetes.utils import FailToCreateError +from tabulate import tabulate + +from ..utils import xpk_exit, xpk_print + +XPK_SA = "xpk-sa" +STORAGE_CRD_PATH = "/../api/storage_crd.yaml" +STORAGE_TEMPLATE_PATH = "/../templates/storage.yaml" +STORAGE_CRD_NAME = "storages.xpk.x-k8s.io" +STORAGE_CRD_KIND = "Storage" +XPK_API_GROUP_NAME = "xpk.x-k8s.io" +XPK_API_GROUP_VERSION = "v1" +GCS_FUSE_TYPE = "gcsfuse" + + +@dataclass +class Storage: + """ + Represents a Storage custom resource in Kubernetes. + + Attributes: + name: The name of the Storage resource. + type: The type of storage (e.g., 'GCSFuse'). + cluster: The cluster where the storage is located. + auto_mount: Whether the storage should be automatically mounted to every workload. + mount_point: The path on which a given storage should be mounted for a workload. + readonly: Whether the storage is read-only. + manifest: The path to a yaml file containing PersistentVolume and PersistentVolumeClaim for a given storage. + pvc: The name of the PersistentVolumeClaim associated with the storage. + pv: The name of the PersistentVolume associated with the storage. + bucket: The name of the bucket PersistentVolume refers to. + """ + + name: str + type: str + auto_mount: bool + mount_point: str + readonly: bool + manifest: str + pvc: str + pv: str + bucket: str + + def __init__(self, data: dict): + """ + Initializes a Storage object from a dictionary. + + Args: + data: A dictionary containing the Storage resource definition. + """ + metadata: k8s_client.V1ObjectMeta = data.get("metadata", {}) + self.name = metadata.get("name") + spec = data.get("spec", {}) + self.type: str = spec.get("type") + self.auto_mount: bool = spec.get("auto_mount") + self.mount_point: bool = spec.get("mount_point") + self.readonly: bool = spec.get("readonly") + self.manifest: str = spec.get("manifest") + self.pvc: str = spec.get("pvc") + self.pv: str = spec.get("pv") + self.bucket: str = self._get_bucket() + + def fields_as_list(self) -> list[str]: + """ + Returns a list of fields for display purposes. + + Returns: + A list of strings representing the Storage object's fields. + """ + return [ + self.name, + self.type, + self.auto_mount, + self.mount_point, + self.readonly, + self.manifest, + ] + + def _get_bucket(self) -> str: + """ + Retrieves the bucket name from PersistentVolume definition associated with the storage. + + Returns: + The name of the bucket. + """ + client = k8s_client.CoreV1Api() + try: + pv: V1PersistentVolume = client.read_persistent_volume(self.pv) + except client.ApiException as e: + xpk_print( + f"Exception when calling CoreV1Api->read_persistent_volume: {e}" + ) + return pv.spec.csi.volume_handle + + def get_mount_options(self) -> list[str]: + """ + Retrieves the mount options for the PersistentVolume. + + Returns: + A list of mount options. + """ + client = k8s_client.CoreV1Api() + try: + pv: V1PersistentVolume = client.read_persistent_volume(self.pv) + except client.ApiException as e: + xpk_print( + f"Exception when calling CoreV1Api->read_persistent_volume: {e}" + ) + return pv.spec.mount_options + + +def list_storages(k8s_api_client: ApiClient) -> list[Storage]: + """ + Lists all Storage custom resources in the cluster. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + + Returns: + A list of Storage objects representing the Storage resources. + """ + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + try: + resp = api_instance.list_cluster_custom_object( + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + ) + except ApiException as e: + xpk_print(f"Kubernetes API exception while listing Storages: {e}") + xpk_exit(1) + + storages = [] + for stg in resp["items"]: + storage = Storage(stg) + storages.append(storage) + return storages + + +def get_auto_mount_storages(k8s_api_client: ApiClient) -> list[Storage]: + """ + Retrieves all Storage resources that have --auto-mount flag set to true. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + + Returns: + A list of Storage objects that have `auto_mount` set to True. + """ + auto_mount_storages: list[Storage] = [] + for storage in list_storages(k8s_api_client): + if storage.auto_mount is True: + auto_mount_storages.append(storage) + return auto_mount_storages + + +def get_storages( + k8s_api_client: ApiClient, requested_storages: list[str] +) -> list[Storage]: + """ + Retrieves a list of Storage resources by their names. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + names: A list of Storage resource names to retrieve. + + Returns: + A list of Storage objects matching the given names. + """ + storages: list[Storage] = [] + all_storages = list_storages(k8s_api_client) + for storage in requested_storages: + if storage in all_storages: + storages.append(storage) + else: + xpk_print( + f"Storage: {storage} not found. Choose one of the available storages:" + f" {all_storages}" + ) + xpk_exit(1) + return storages + + +def get_storages_to_mount( + k8s_api_client: ApiClient, requested_storages: list[str] +) -> list[Storage]: + """ + Retrieves a list of Storage resources by their names, including auto-mounted storages. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + names: A list of Storage resource names to retrieve. + + Returns: + A list of Storage objects matching the given names and any auto-mounted storages. + """ + storages = get_storages(k8s_api_client, requested_storages) + for auto_mounted_stg in get_auto_mount_storages(k8s_api_client): + # prevent duplicating storages + if auto_mounted_stg.name not in requested_storages: + storages.append(auto_mounted_stg) + + return storages + + +def get_storage(k8s_api_client: ApiClient, name: str) -> Storage: + """ + Retrieves a specific Storage custom resource by its name. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + name: The name of the Storage resource to retrieve. + + Returns: + A Storage object representing the retrieved Storage resource. + """ + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + try: + resp = api_instance.get_cluster_custom_object( + name=name, + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + ) + return Storage(resp) + except ApiException as e: + xpk_print(f"Kubernetes API exception while getting Storage {name}: {e}") + xpk_exit(1) + + +def install_storage_crd(k8s_api_client: ApiClient) -> None: + """ + Installs the Storage custom resource definition (CRD) in the Kubernetes cluster. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + """ + xpk_print(f"Creating a new CRD: {STORAGE_CRD_NAME}") + try: + utils.create_from_yaml( + k8s_api_client, + f"{os.path.dirname(__file__)}{STORAGE_CRD_PATH}", + verbose=True, + ) + xpk_print(f"Created a CRD: {STORAGE_CRD_NAME} successfully") + except FailToCreateError as e: + for api_exception in e.api_exceptions: + if api_exception.status == 409: + xpk_print( + f"CRD: {STORAGE_CRD_NAME} already exists. Skipping its creation" + ) + break + else: + xpk_print(f"Encountered error during installing Storage CRD: {e}") + xpk_exit(1) + + +def print_storages_for_cluster(storages: list[Storage]) -> None: + """ + Prints in human readable manner a table of Storage resources that belong to the specified cluster. + + Args: + storages: A list of Storage objects. + cluster: The name of the cluster to filter by. + """ + headers = [ + "NAME", + "TYPE", + "AUTO MOUNT", + "MOUNT POINT", + "READONLY", + "MANIFEST", + ] + storage_tab = [] + for storage in storages: + storage_tab.append(storage.fields_as_list()) + + print( + tabulate( + storage_tab, + headers=headers, + ) + ) + + +def create_storage_instance(k8s_api_client: ApiClient, args: Namespace) -> None: + """ + Creates a new Storage custom resource in the Kubernetes cluster. + + This function reads a Storage template from a YAML file, populates it with + values from the provided arguments, and then creates the Storage object + in the cluster. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + args: An argparse Namespace object containing the arguments for creating + the Storage resource. + """ + abs_path = f"{os.path.dirname(__file__)}{STORAGE_TEMPLATE_PATH}" + with open(abs_path, "r", encoding="utf-8") as file: + data = yaml.safe_load(file) + + data["metadata"]["name"] = args.name + spec = data["spec"] + spec["cluster"] = args.cluster + spec["type"] = args.type + spec["auto_mount"] = args.auto_mount + spec["mount_point"] = args.mount_point + spec["readonly"] = args.readonly + spec["manifest"] = args.manifest + + with open(args.manifest, "r", encoding="utf-8") as f: + pv_pvc_definitions = yaml.safe_load_all(f) + for obj in pv_pvc_definitions: + if obj["kind"] == "PersistentVolume": + spec["pv"] = obj["metadata"]["name"] + elif obj["kind"] == "PersistentVolumeClaim": + spec["pvc"] = obj["metadata"]["name"] + + data["spec"] = spec + + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + xpk_print(f"Creating a new Storage: {args.name}") + try: + api_instance.create_cluster_custom_object( + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + body=data, + ) + xpk_print(f"Created {STORAGE_CRD_KIND} object: {data['metadata']['name']}") + except ApiException as e: + if e.status == 409: + xpk_print(f"Storage: {args.name} already exists. Skipping its creation") + else: + xpk_print(f"Encountered error during storage creation: {e}") + xpk_exit(1) diff --git a/src/xpk/parser/common.py b/src/xpk/parser/common.py index 390f7bd2..28514bd9 100644 --- a/src/xpk/parser/common.py +++ b/src/xpk/parser/common.py @@ -17,7 +17,9 @@ import argparse -def add_shared_arguments(custom_parser: argparse.ArgumentParser): +def add_shared_arguments( + custom_parser: argparse.ArgumentParser, required=False +) -> None: """Add shared arguments to the parser. Args: @@ -28,6 +30,7 @@ def add_shared_arguments(custom_parser: argparse.ArgumentParser): type=str, default=None, help='GCE project name, defaults to "gcloud config project."', + required=required, ) custom_parser.add_argument( '--zone', @@ -38,6 +41,7 @@ def add_shared_arguments(custom_parser: argparse.ArgumentParser): 'compute/zone." Only one of --zone or --region is allowed in a ' 'command.' ), + required=required, ) custom_parser.add_argument( '--dry-run', @@ -49,4 +53,5 @@ def add_shared_arguments(custom_parser: argparse.ArgumentParser): ' but not run them. This is imperfect in cases where xpk might' ' branch based on the output of commands' ), + required=required, ) diff --git a/src/xpk/parser/core.py b/src/xpk/parser/core.py index 5c104e09..ea5987ad 100644 --- a/src/xpk/parser/core.py +++ b/src/xpk/parser/core.py @@ -19,6 +19,7 @@ from ..utils import xpk_print from .cluster import set_cluster_parser from .inspector import set_inspector_parser +from .storage import set_storage_parser from .workload import set_workload_parsers @@ -27,7 +28,10 @@ def set_parser(parser: argparse.ArgumentParser): title="xpk subcommands", dest="xpk_subcommands", help="Top level commands" ) workload_parser = xpk_subcommands.add_parser( - "workload", help="commands around workload management" + "workload", help="Commands around workload management" + ) + storage_parser = xpk_subcommands.add_parser( + "storage", help="Commands around storage management" ) cluster_parser = xpk_subcommands.add_parser( "cluster", @@ -35,7 +39,7 @@ def set_parser(parser: argparse.ArgumentParser): ) inspector_parser = xpk_subcommands.add_parser( "inspector", - help="commands around investigating workload, and Kueue failures.", + help="Commands around investigating workload, and Kueue failures.", ) def default_subcommand_function( @@ -53,12 +57,15 @@ def default_subcommand_function( parser.print_help() cluster_parser.print_help() workload_parser.print_help() + storage_parser.print_help() return 0 parser.set_defaults(func=default_subcommand_function) workload_parser.set_defaults(func=default_subcommand_function) cluster_parser.set_defaults(func=default_subcommand_function) + storage_parser.set_defaults(func=default_subcommand_function) set_workload_parsers(workload_parser=workload_parser) set_cluster_parser(cluster_parser=cluster_parser) set_inspector_parser(inspector_parser=inspector_parser) + set_storage_parser(storage_parser=storage_parser) diff --git a/src/xpk/parser/inspector.py b/src/xpk/parser/inspector.py index dfadc6c7..1f2d13b8 100644 --- a/src/xpk/parser/inspector.py +++ b/src/xpk/parser/inspector.py @@ -43,9 +43,9 @@ def set_inspector_parser(inspector_parser): required=True, ) - ### "inspector" Optional Arguments add_shared_arguments(inspector_parser_optional_arguments) + ### "inspector" Optional Arguments inspector_parser_optional_arguments.add_argument( '--workload', type=workload_name_type, diff --git a/src/xpk/parser/storage.py b/src/xpk/parser/storage.py new file mode 100644 index 00000000..8404ff4b --- /dev/null +++ b/src/xpk/parser/storage.py @@ -0,0 +1,122 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import argparse + +from ..commands.storage import storage_create, storage_delete, storage_list +from .common import add_shared_arguments + + +def set_storage_parser(storage_parser: argparse.ArgumentParser) -> None: + storage_subcommands = storage_parser.add_subparsers( + title='storage subcommands', + dest='xpk_storage_subcommands', + help=( + 'These are commands related to storage management. Look at help for' + ' specific subcommands for more details.' + ), + ) + add_storage_create_parser(storage_subcommands) + add_storage_list_parser(storage_subcommands) + add_storage_delete_parser(storage_subcommands) + + +def add_storage_create_parser( + storage_subcommands_parser: argparse.ArgumentParser, +) -> None: + + storage_create_parser: argparse.ArgumentParser = ( + storage_subcommands_parser.add_parser( + 'create', help='Create XPK Storage.' + ) + ) + storage_create_parser.set_defaults(func=storage_create) + req_args = storage_create_parser.add_argument_group( + 'Required Arguments', + 'Arguments required for storage create.', + ) + add_shared_arguments(req_args) + req_args.add_argument( + 'name', + type=str, + help='The name of storage', + ) + req_args.add_argument( + '--type', + type=str, + help='The type of storage. Currently supported types: ["gcsfuse"]', + choices=['gcsfuse'], + required=True, + ) + req_args.add_argument( + '--cluster', + type=str, + required=True, + ) + req_args.add_argument( + '--auto-mount', type=lambda v: v.lower() == 'true', required=True + ) + req_args.add_argument( + '--mount-point', + type=str, + required=True, + ) + req_args.add_argument( + '--readonly', type=lambda v: v.lower() == 'true', required=True + ) + + req_args.add_argument( + '--manifest', + type=str, + required=True, + ) + + +def add_storage_list_parser( + storage_subcommands_parser: argparse.ArgumentParser, +): + storage_list_parser: argparse.ArgumentParser = ( + storage_subcommands_parser.add_parser('list', help='List XPK Storages.') + ) + storage_list_parser.set_defaults(func=storage_list) + add_shared_arguments(storage_list_parser) + req_args = storage_list_parser.add_argument_group( + 'Required Arguments', + 'Arguments required for storage list.', + ) + req_args.add_argument( + '--cluster', + type=str, + ) + + +def add_storage_delete_parser( + storage_subcommands_parser: argparse.ArgumentParser, +): + storage_delete_parser: argparse.ArgumentParser = ( + storage_subcommands_parser.add_parser( + 'delete', help='Delete XPK Storage.' + ) + ) + storage_delete_parser.set_defaults(func=storage_delete) + add_shared_arguments(storage_delete_parser) + + req_args = storage_delete_parser.add_argument_group( + 'Required Arguments', + 'Arguments required for storage delete.', + ) + req_args.add_argument('name', type=str) + req_args.add_argument('--cluster', type=str, required=True) diff --git a/src/xpk/templates/__init__.py b/src/xpk/templates/__init__.py new file mode 100644 index 00000000..c133d2d7 --- /dev/null +++ b/src/xpk/templates/__init__.py @@ -0,0 +1,15 @@ +""" +Copyright 2023 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/src/xpk/templates/pod.yaml b/src/xpk/templates/pod.yaml new file mode 100644 index 00000000..e69de29b diff --git a/src/xpk/templates/storage.yaml b/src/xpk/templates/storage.yaml new file mode 100644 index 00000000..3a6d8471 --- /dev/null +++ b/src/xpk/templates/storage.yaml @@ -0,0 +1,13 @@ +apiVersion: xpk.x-k8s.io/v1 +kind: Storage +metadata: + name: +spec: + auto_mount: + cluster: + manifest: + mount_point: + readonly: + type: + pvc: + pv: diff --git a/src/xpk/utils.py b/src/xpk/utils.py index b984887c..d8576131 100644 --- a/src/xpk/utils.py +++ b/src/xpk/utils.py @@ -20,6 +20,10 @@ import sys import tempfile +import yaml +from kubernetes.client.exceptions import ApiException +from kubernetes.dynamic import DynamicClient + def chunks(lst: list, n: int): """Return a list of n-sized chunks from lst. @@ -34,7 +38,9 @@ def chunks(lst: list, n: int): return [lst[i : i + n] for i in range(0, len(lst), n)] -def get_value_from_map(key: str, map_to_search: dict) -> tuple[int, str | None]: +def get_value_from_map( + key: str, map_to_search: dict, verbose: bool = True +) -> tuple[int, str | None]: """Helper function to get value from a map if the key exists. Args: @@ -50,10 +56,11 @@ def get_value_from_map(key: str, map_to_search: dict) -> tuple[int, str | None]: if value: return 0, value else: - xpk_print( - f'Unable to find key: {key} in map: {map_to_search}.' - f'The map has the following keys: {map_to_search.keys()}' - ) + if verbose: + xpk_print( + f'Unable to find key: {key} in map: {map_to_search}.' + f'The map has the following keys: {map_to_search.keys()}' + ) return 1, value @@ -77,7 +84,6 @@ def make_tmp_files(per_command_name): def write_tmp_file(payload): """Writes `payload` to a temporary file. - Args: payload: The string to be written to the file. @@ -165,3 +171,42 @@ def directory_path_type(value): f'Directory path is invalid. User provided path was {value}' ) return value + + +def apply_kubectl_manifest(client, file_path): + xpk_print('Applying manifest') + dynamic_client = DynamicClient(client) + + with open(file_path, 'r', encoding='utf-8') as f: + manifest_data = yaml.safe_load_all(f) + for obj in manifest_data: + api_version = obj['apiVersion'] + kind = obj['kind'] + namespace = obj.get('metadata', {}).get('namespace', 'default') + + api_resource = dynamic_client.resources.get( + api_version=api_version, kind=kind + ) + + try: + api_resource.get(name=obj['metadata']['name'], namespace=namespace) + api_resource.patch( + body=obj, + namespace=namespace, + name=obj['metadata']['name'], + content_type='application/merge-patch+json', + ) + xpk_print( + f"Updated {kind} '{obj['metadata']['name']}' in namespace" + f" '{namespace}'" + ) + + except ApiException as e: + if e.status == 404: + api_resource.create(body=obj, namespace=namespace) + xpk_print( + f"Applied {kind} '{obj['metadata']['name']}' in namespace" + f" '{namespace}'" + ) + else: + xpk_print(f'Error applying {kind}: {e}') diff --git a/tests/data/pv-pvc-templates.yaml b/tests/data/pv-pvc-templates.yaml new file mode 100644 index 00000000..61da7e5f --- /dev/null +++ b/tests/data/pv-pvc-templates.yaml @@ -0,0 +1,31 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: gcs-fuse-csi-pv +spec: + accessModes: + - ReadWriteMany + capacity: + storage: 1Gi + storageClassName: example-storage-class + mountOptions: + - implicit-dirs + csi: + driver: gcsfuse.csi.storage.gke.io + volumeHandle: xpk-ci-cd-tests + volumeAttributes: + gcsfuseLoggingSeverity: warning +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: gcs-fuse-csi-static-pvc + namespace: default +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 1Gi + volumeName: gcs-fuse-csi-pv + storageClassName: example-storage-class