Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{Containerapp} az containerapp env certificate list/upload: Refactor command with decorator #7295

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/containerapp/azext_containerapp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def load_command_table(self, _):

with self.command_group('containerapp env certificate') as g:
g.custom_command('create', 'create_managed_certificate', is_preview=True)
g.custom_command('upload', 'upload_certificate')
g.custom_command('list', 'list_certificates', is_preview=True)
g.custom_command('delete', 'delete_certificate', confirmation=True, exception_handler=ex_handler_factory(), is_preview=True)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, Dict
from knack.log import get_logger

from azure.cli.command_modules.containerapp._utils import certificate_matches, certificate_location_matches, \
load_cert_file, generate_randomized_cert_name
from azure.cli.command_modules.containerapp.base_resource import BaseResource
from azure.cli.core.azclierror import MutuallyExclusiveArgumentError, ValidationError
from azure.cli.core.commands import AzCliCommand
from knack.prompting import prompt_y_n
from knack.util import CLIError
from msrestazure.tools import is_valid_resource_id, parse_resource_id

from ._constants import PRIVATE_CERTIFICATE_RT, MANAGED_CERTIFICATE_RT, CHECK_CERTIFICATE_NAME_AVAILABILITY_TYPE, \
NAME_ALREADY_EXISTS, NAME_INVALID
from ._client_factory import handle_non_404_exception, handle_raw_exception
from ._models import ContainerAppCertificateEnvelope as ContainerAppCertificateEnvelopeModel

logger = get_logger(__name__)


class ContainerappEnvCertificateDecorator(BaseResource):
def get_argument_location(self):
return self.get_param("location")

def get_argument_thumbprint(self):
return self.get_param("thumbprint")

def get_argument_certificate(self):
return self.get_param("certificate")

def get_argument_managed_certificates_only(self):
return self.get_param("managed_certificates_only")

def get_argument_private_key_certificates_only(self):
return self.get_param("private_key_certificates_only")

def check_cert_name_availability(self, resource_group_name, name, cert_name):
name_availability_request = {"name": cert_name, "type": CHECK_CERTIFICATE_NAME_AVAILABILITY_TYPE}
try:
return self.client.check_name_availability(self.cmd, resource_group_name, name, name_availability_request)
except CLIError as e:
handle_raw_exception(e)

def get_private_certificates(self, certificate_name=None):
if certificate_name:
try:
r = self.client.show_certificate(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name(), certificate_name)
return [r] if certificate_matches(r, self.get_argument_location(), self.get_argument_thumbprint()) else []
except Exception as e:
handle_non_404_exception(e)
return []
else:
try:
r = self.client.list_certificates(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name())
return list(filter(lambda c: certificate_matches(c, self.get_argument_location(), self.get_argument_thumbprint()), r))
except Exception as e:
handle_raw_exception(e)

def get_managed_certificates(self, certificate_name=None):
if certificate_name:
try:
r = self.client.show_managed_certificate(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name(), certificate_name)
return [r] if certificate_location_matches(r, self.get_argument_location()) else []
except Exception as e:
handle_non_404_exception(e)
return []
else:
try:
r = self.client.list_managed_certificates(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name())
return list(filter(lambda c: certificate_location_matches(c, self.get_argument_location()), r))
except Exception as e:
handle_raw_exception(e)


class ContainerappEnvCertificateListDecorator(ContainerappEnvCertificateDecorator):
Greedygre marked this conversation as resolved.
Show resolved Hide resolved
def validate_arguments(self):
if self.get_argument_managed_certificates_only() and self.get_argument_private_key_certificates_only():
raise MutuallyExclusiveArgumentError(
"Use either '--managed-certificates-only' or '--private-key-certificates-only'.")
if self.get_argument_managed_certificates_only() and self.get_argument_thumbprint():
raise MutuallyExclusiveArgumentError("'--thumbprint' not supported for managed certificates.")

def list(self):
if self.get_argument_certificate() and is_valid_resource_id(self.get_argument_certificate()):
certificate_name = parse_resource_id(self.get_argument_certificate())["resource_name"]
certificate_type = parse_resource_id(self.get_argument_certificate())["resource_type"]
else:
certificate_name = self.get_argument_certificate()
certificate_type = PRIVATE_CERTIFICATE_RT if self.get_argument_private_key_certificates_only() or self.get_argument_thumbprint() else (MANAGED_CERTIFICATE_RT if self.get_argument_managed_certificates_only() else None)

if certificate_type == MANAGED_CERTIFICATE_RT:
return self.get_managed_certificates(certificate_name)
if certificate_type == PRIVATE_CERTIFICATE_RT:
return self.get_private_certificates(certificate_name)
managed_certs = self.get_managed_certificates(certificate_name)
private_certs = self.get_private_certificates(certificate_name)
return managed_certs + private_certs


class ContainerappEnvCertificateUploadDecorator(ContainerappEnvCertificateDecorator):
def __init__(self, cmd: AzCliCommand, client: Any, raw_parameters: Dict, models: str):
super().__init__(cmd, client, raw_parameters, models)
self.certificate = ContainerAppCertificateEnvelopeModel
self.cert_name = None

def get_argument_certificate_name(self):
return self.get_param("certificate_name")

def get_argument_certificate_file(self):
return self.get_param("certificate_file")

def get_argument_certificate_password(self):
return self.get_param("certificate_password")

def get_argument_prompt(self):
return self.get_param("prompt")

def construct_payload(self):
blob, thumbprint = load_cert_file(self.get_argument_certificate_file(), self.get_argument_certificate_password())

if self.get_argument_certificate_name():
name_availability = self.check_cert_name_availability(self.get_argument_resource_group_name(), self.get_argument_name(), self.get_argument_certificate_name())
if not name_availability["nameAvailable"]:
if name_availability["reason"] == NAME_ALREADY_EXISTS:
msg = '{}. If continue with this name, it will be overwritten by the new certificate file.\nOverwrite?'
overwrite = True
if self.get_argument_prompt():
overwrite = prompt_y_n(msg.format(name_availability["message"]))
else:
logger.warning('{}. It will be overwritten by the new certificate file.'.format(
name_availability["message"]))
if overwrite:
self.cert_name = self.get_argument_certificate_name()
else:
raise ValidationError(name_availability["message"])
else:
self.cert_name = self.get_argument_certificate_name()

while not self.cert_name:
random_name = generate_randomized_cert_name(thumbprint, self.get_argument_name(), self.get_argument_resource_group_name())
check_result = self.check_cert_name_availability(self.get_argument_resource_group_name(), self.get_argument_name(), random_name)
if check_result["nameAvailable"]:
self.cert_name = random_name
elif not check_result["nameAvailable"] and (check_result["reason"] == NAME_INVALID):
raise ValidationError(check_result["message"])

self.certificate["properties"]["password"] = self.get_argument_certificate_password()
self.certificate["properties"]["value"] = blob
self.certificate["location"] = self.get_argument_location()
if not self.certificate["location"]:
try:
managed_env = self.client.show(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name())
self.certificate["location"] = managed_env["location"]
except Exception as e:
handle_raw_exception(e)

def create_or_update(self):
try:
r = self.client.create_or_update_certificate(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name(), self.cert_name, self.certificate)
return r
except Exception as e:
handle_raw_exception(e)
31 changes: 29 additions & 2 deletions src/containerapp/azext_containerapp/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from msrestazure.tools import parse_resource_id, is_valid_resource_id
from msrest.exceptions import DeserializationError

from .containerapp_env_certificate_decorator import ContainerappEnvCertificateListDecorator, \
ContainerappEnvCertificateUploadDecorator
from .connected_env_decorator import ConnectedEnvironmentDecorator, ConnectedEnvironmentCreateDecorator
from .containerapp_job_decorator import ContainerAppJobPreviewCreateDecorator
from .containerapp_env_decorator import ContainerappEnvPreviewCreateDecorator, ContainerappEnvPreviewUpdateDecorator
Expand Down Expand Up @@ -1236,8 +1238,33 @@ def create_managed_certificate(cmd, name, resource_group_name, hostname, validat


def list_certificates(cmd, name, resource_group_name, location=None, certificate=None, thumbprint=None, managed_certificates_only=False, private_key_certificates_only=False):
from azure.cli.command_modules.containerapp.custom import list_certificates_logic
return list_certificates_logic(cmd, name, resource_group_name, location, certificate, thumbprint, managed_certificates_only=managed_certificates_only, private_key_certificates_only=private_key_certificates_only)
raw_parameters = locals()

containerapp_env_certificate_list_decorator = ContainerappEnvCertificateListDecorator(
cmd=cmd,
client=ManagedEnvironmentPreviewClient,
raw_parameters=raw_parameters,
models=CONTAINER_APPS_SDK_MODELS
)
containerapp_env_certificate_list_decorator.validate_subscription_registered(CONTAINER_APPS_RP)
containerapp_env_certificate_list_decorator.validate_arguments()

return containerapp_env_certificate_list_decorator.list()


def upload_certificate(cmd, name, resource_group_name, certificate_file, certificate_name=None, certificate_password=None, location=None, prompt=False):
raw_parameters = locals()

containerapp_env_certificate_upload_decorator = ContainerappEnvCertificateUploadDecorator(
cmd=cmd,
client=ManagedEnvironmentPreviewClient,
raw_parameters=raw_parameters,
models=CONTAINER_APPS_SDK_MODELS
)
containerapp_env_certificate_upload_decorator.validate_subscription_registered(CONTAINER_APPS_RP)
containerapp_env_certificate_upload_decorator.construct_payload()

return containerapp_env_certificate_upload_decorator.create_or_update()


def delete_certificate(cmd, resource_group_name, name, location=None, certificate=None, thumbprint=None):
Expand Down
Loading
Loading