Skip to content

Commit

Permalink
Fix storage class, differentiate storage types in create and delete c…
Browse files Browse the repository at this point in the history
…ommands, add --type validation, reduce number of CRD installations
  • Loading branch information
PBundyra committed Oct 17, 2024
1 parent aff257b commit fc2daf3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: gcloud auth configure-docker --quiet
- name: Create auto-mount Storage instance
run: |
python3 xpk.py storage create $STORAGE_NAME --cluster=$TPU_CLUSTER_NAME --zone=us-central2-b --type=test-type \
python3 xpk.py storage create $STORAGE_NAME --cluster=$TPU_CLUSTER_NAME --zone=us-central2-b --type=gcsfuse \
--auto-mount=true \
--mount-point='/test-mount-point' --readonly=false --manifest='./tests/data/pv-pvc-templates.yaml'
- name: List and verify existing Storages
Expand Down
4 changes: 4 additions & 0 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
run_gke_node_pool_create_command,
set_jobset_on_cluster,
set_up_cluster_network_for_gpu,
setup_k8s_env,
update_cluster_with_clouddns_if_necessary,
update_cluster_with_gcsfuse_driver_if_necessary,
update_cluster_with_workload_identity_if_necessary,
Expand All @@ -43,6 +44,7 @@
install_kueue_on_cluster,
)
from ..core.nap import enable_autoprovisioning_on_cluster
from ..core.storage import install_storage_crd
from ..core.system_characteristics import (
AcceleratorType,
AcceleratorTypeToAcceleratorCharacteristics,
Expand Down Expand Up @@ -159,6 +161,8 @@ def cluster_create(args) -> None:
if install_kueue_on_cluster_code != 0:
xpk_exit(install_kueue_on_cluster_code)

k8s_client = setup_k8s_env(args)
install_storage_crd(k8s_client)
# Provision node pools dynamically based on incoming workloads:
# Currently autoprovisioning is not supported with Pathways.
autoprovisioning_config = None
Expand Down
46 changes: 22 additions & 24 deletions src/xpk/commands/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
update_cluster_with_workload_identity_if_necessary,
)
from ..core.storage import (
GCS_FUSE_TYPE,
STORAGE_CRD_KIND,
XPK_API_GROUP_NAME,
XPK_API_GROUP_VERSION,
create_storage_instance,
get_storage,
install_storage_crd,
list_storages,
print_storages_for_cluster,
)
Expand All @@ -39,24 +39,21 @@

def storage_create(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)

install_storage_crd(k8s_api_client)
return_code = update_cluster_with_workload_identity_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)
return_code = update_cluster_with_gcsfuse_driver_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)

create_storage_instance(k8s_api_client, args)
apply_kubectl_manifest(k8s_api_client, args.manifest)
if args.type == GCS_FUSE_TYPE:
return_code = update_cluster_with_workload_identity_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)
return_code = update_cluster_with_gcsfuse_driver_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)
apply_kubectl_manifest(k8s_api_client, args.manifest)


def storage_list(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)
install_storage_crd(k8s_api_client)
storages = list_storages(k8s_api_client)
print_storages_for_cluster(storages, args.cluster)
print_storages_for_cluster(storages)


def delete_resource(api_call, resource_name: str, resource_kind: str) -> None:
Expand Down Expand Up @@ -86,20 +83,21 @@ def delete_resource(api_call, resource_name: str, resource_kind: str) -> None:

def storage_delete(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)
install_storage_crd(k8s_api_client)
api_instance = k8s_client.CustomObjectsApi(k8s_api_client)
core_api = k8s_client.CoreV1Api()
storage = get_storage(k8s_api_client, args.name)
delete_resource(
lambda name: core_api.delete_namespaced_persistent_volume_claim(
name, "default"
),
storage.pvc,
"Persistent Volume Claim",
)
delete_resource(
core_api.delete_persistent_volume, storage.pv, "Persistent Volume"
)
if storage.type == GCS_FUSE_TYPE:
delete_resource(
lambda name: core_api.delete_namespaced_persistent_volume_claim(
name, "default"
),
storage.pvc,
"Persistent Volume Claim",
)
delete_resource(
core_api.delete_persistent_volume, storage.pv, "Persistent Volume"
)

delete_resource(
lambda name: api_instance.delete_cluster_custom_object(
name=name,
Expand Down
8 changes: 3 additions & 5 deletions src/xpk/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
STORAGE_CRD_KIND = "Storage"
XPK_API_GROUP_NAME = "xpk.x-k8s.io"
XPK_API_GROUP_VERSION = "v1"
GCS_FUSE_TYPE = "gcsfuse"


@dataclass
Expand All @@ -58,7 +59,6 @@ class Storage:

name: str
type: str
cluster: str
auto_mount: bool
mount_point: str
readonly: bool
Expand All @@ -78,7 +78,6 @@ def __init__(self, data: dict):
self.name = metadata.get("name")
spec = data.get("spec", {})
self.type: str = spec.get("type")
self.cluster: str = spec.get("cluster")
self.auto_mount: bool = spec.get("auto_mount")
self.mount_point: bool = spec.get("mount_point")
self.readonly: bool = spec.get("readonly")
Expand Down Expand Up @@ -282,7 +281,7 @@ def install_storage_crd(k8s_api_client: ApiClient) -> None:
xpk_exit(1)


def print_storages_for_cluster(storages: list[Storage], cluster: str):
def print_storages_for_cluster(storages: list[Storage]) -> None:
"""
Prints in human readable manner a table of Storage resources that belong to the specified cluster.
Expand All @@ -300,8 +299,7 @@ def print_storages_for_cluster(storages: list[Storage], cluster: str):
]
storage_tab = []
for storage in storages:
if storage.cluster == cluster:
storage_tab.append(storage.fields_as_list())
storage_tab.append(storage.fields_as_list())

print(
tabulate(
Expand Down
3 changes: 2 additions & 1 deletion src/xpk/parser/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def add_storage_create_parser(
req_args.add_argument(
'--type',
type=str,
help='The type of storage',
help='The type of storage. Currently supported types: ["gcsfuse"]',
choices=['gcsfuse'],
required=True,
)
req_args.add_argument(
Expand Down

0 comments on commit fc2daf3

Please sign in to comment.