From fc2daf350f283261344cfac4d4ace6934dffc52c Mon Sep 17 00:00:00 2001 From: Patryk Bundyra Date: Thu, 17 Oct 2024 14:54:58 +0000 Subject: [PATCH] Fix storage class, differentiate storage types in create and delete commands, add --type validation, reduce number of CRD installations --- .github/workflows/build_tests.yaml | 2 +- src/xpk/commands/cluster.py | 4 +++ src/xpk/commands/storage.py | 46 ++++++++++++++---------------- src/xpk/core/storage.py | 8 ++---- src/xpk/parser/storage.py | 3 +- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/.github/workflows/build_tests.yaml b/.github/workflows/build_tests.yaml index ff3f752f..958f7fd3 100644 --- a/.github/workflows/build_tests.yaml +++ b/.github/workflows/build_tests.yaml @@ -63,7 +63,7 @@ jobs: run: gcloud auth configure-docker --quiet - name: Create auto-mount Storage instance run: | - python3 xpk.py storage create $STORAGE_NAME --cluster=$TPU_CLUSTER_NAME --zone=us-central2-b --type=test-type \ + python3 xpk.py storage create $STORAGE_NAME --cluster=$TPU_CLUSTER_NAME --zone=us-central2-b --type=gcsfuse \ --auto-mount=true \ --mount-point='/test-mount-point' --readonly=false --manifest='./tests/data/pv-pvc-templates.yaml' - name: List and verify existing Storages diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 8774426e..c580344a 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -32,6 +32,7 @@ run_gke_node_pool_create_command, set_jobset_on_cluster, set_up_cluster_network_for_gpu, + setup_k8s_env, update_cluster_with_clouddns_if_necessary, update_cluster_with_gcsfuse_driver_if_necessary, update_cluster_with_workload_identity_if_necessary, @@ -43,6 +44,7 @@ install_kueue_on_cluster, ) from ..core.nap import enable_autoprovisioning_on_cluster +from ..core.storage import install_storage_crd from ..core.system_characteristics import ( AcceleratorType, AcceleratorTypeToAcceleratorCharacteristics, @@ -159,6 +161,8 @@ def cluster_create(args) -> None: if install_kueue_on_cluster_code != 0: xpk_exit(install_kueue_on_cluster_code) + k8s_client = setup_k8s_env(args) + install_storage_crd(k8s_client) # Provision node pools dynamically based on incoming workloads: # Currently autoprovisioning is not supported with Pathways. autoprovisioning_config = None diff --git a/src/xpk/commands/storage.py b/src/xpk/commands/storage.py index df76cf5f..0dcdf0b0 100644 --- a/src/xpk/commands/storage.py +++ b/src/xpk/commands/storage.py @@ -25,12 +25,12 @@ update_cluster_with_workload_identity_if_necessary, ) from ..core.storage import ( + GCS_FUSE_TYPE, STORAGE_CRD_KIND, XPK_API_GROUP_NAME, XPK_API_GROUP_VERSION, create_storage_instance, get_storage, - install_storage_crd, list_storages, print_storages_for_cluster, ) @@ -39,24 +39,21 @@ def storage_create(args: Namespace) -> None: k8s_api_client = setup_k8s_env(args) - - install_storage_crd(k8s_api_client) - return_code = update_cluster_with_workload_identity_if_necessary(args) - if return_code > 0: - xpk_exit(return_code) - return_code = update_cluster_with_gcsfuse_driver_if_necessary(args) - if return_code > 0: - xpk_exit(return_code) - create_storage_instance(k8s_api_client, args) - apply_kubectl_manifest(k8s_api_client, args.manifest) + if args.type == GCS_FUSE_TYPE: + return_code = update_cluster_with_workload_identity_if_necessary(args) + if return_code > 0: + xpk_exit(return_code) + return_code = update_cluster_with_gcsfuse_driver_if_necessary(args) + if return_code > 0: + xpk_exit(return_code) + apply_kubectl_manifest(k8s_api_client, args.manifest) def storage_list(args: Namespace) -> None: k8s_api_client = setup_k8s_env(args) - install_storage_crd(k8s_api_client) storages = list_storages(k8s_api_client) - print_storages_for_cluster(storages, args.cluster) + print_storages_for_cluster(storages) def delete_resource(api_call, resource_name: str, resource_kind: str) -> None: @@ -86,20 +83,21 @@ def delete_resource(api_call, resource_name: str, resource_kind: str) -> None: def storage_delete(args: Namespace) -> None: k8s_api_client = setup_k8s_env(args) - install_storage_crd(k8s_api_client) api_instance = k8s_client.CustomObjectsApi(k8s_api_client) core_api = k8s_client.CoreV1Api() storage = get_storage(k8s_api_client, args.name) - delete_resource( - lambda name: core_api.delete_namespaced_persistent_volume_claim( - name, "default" - ), - storage.pvc, - "Persistent Volume Claim", - ) - delete_resource( - core_api.delete_persistent_volume, storage.pv, "Persistent Volume" - ) + if storage.type == GCS_FUSE_TYPE: + delete_resource( + lambda name: core_api.delete_namespaced_persistent_volume_claim( + name, "default" + ), + storage.pvc, + "Persistent Volume Claim", + ) + delete_resource( + core_api.delete_persistent_volume, storage.pv, "Persistent Volume" + ) + delete_resource( lambda name: api_instance.delete_cluster_custom_object( name=name, diff --git a/src/xpk/core/storage.py b/src/xpk/core/storage.py index d02fa280..381da653 100644 --- a/src/xpk/core/storage.py +++ b/src/xpk/core/storage.py @@ -36,6 +36,7 @@ STORAGE_CRD_KIND = "Storage" XPK_API_GROUP_NAME = "xpk.x-k8s.io" XPK_API_GROUP_VERSION = "v1" +GCS_FUSE_TYPE = "gcsfuse" @dataclass @@ -58,7 +59,6 @@ class Storage: name: str type: str - cluster: str auto_mount: bool mount_point: str readonly: bool @@ -78,7 +78,6 @@ def __init__(self, data: dict): self.name = metadata.get("name") spec = data.get("spec", {}) self.type: str = spec.get("type") - self.cluster: str = spec.get("cluster") self.auto_mount: bool = spec.get("auto_mount") self.mount_point: bool = spec.get("mount_point") self.readonly: bool = spec.get("readonly") @@ -282,7 +281,7 @@ def install_storage_crd(k8s_api_client: ApiClient) -> None: xpk_exit(1) -def print_storages_for_cluster(storages: list[Storage], cluster: str): +def print_storages_for_cluster(storages: list[Storage]) -> None: """ Prints in human readable manner a table of Storage resources that belong to the specified cluster. @@ -300,8 +299,7 @@ def print_storages_for_cluster(storages: list[Storage], cluster: str): ] storage_tab = [] for storage in storages: - if storage.cluster == cluster: - storage_tab.append(storage.fields_as_list()) + storage_tab.append(storage.fields_as_list()) print( tabulate( diff --git a/src/xpk/parser/storage.py b/src/xpk/parser/storage.py index e8041852..8404ff4b 100644 --- a/src/xpk/parser/storage.py +++ b/src/xpk/parser/storage.py @@ -57,7 +57,8 @@ def add_storage_create_parser( req_args.add_argument( '--type', type=str, - help='The type of storage', + help='The type of storage. Currently supported types: ["gcsfuse"]', + choices=['gcsfuse'], required=True, ) req_args.add_argument(