Skip to content

Commit

Permalink
[Key Vault] Add support for key rotation (#20416)
Browse files Browse the repository at this point in the history
  • Loading branch information
mccoyp authored Oct 1, 2021
1 parent 3f2a011 commit f463e0b
Show file tree
Hide file tree
Showing 13 changed files with 1,080 additions and 12 deletions.
4 changes: 4 additions & 0 deletions sdk/keyvault/azure-keyvault-keys/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
## 4.5.0b4 (Unreleased)

### Features Added
- Added support for automated and on-demand key rotation in Azure Key Vault
([#19840](https://github.com/Azure/azure-sdk-for-python/issues/19840))
- Added `KeyClient.rotate_key` to rotate a key on-demand
- Added `KeyClient.update_key_rotation_policy` to update a key's automated rotation policy

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# -------------------------------------
from ._enums import KeyCurveName, KeyExportEncryptionAlgorithm, KeyOperation, KeyType
from ._enums import KeyCurveName, KeyExportEncryptionAlgorithm, KeyOperation, KeyRotationPolicyAction, KeyType
from ._shared.client_base import ApiVersion
from ._models import (
DeletedKey,
JsonWebKey,
KeyProperties,
KeyReleasePolicy,
KeyRotationLifetimeAction,
KeyRotationPolicy,
KeyVaultKey,
KeyVaultKeyIdentifier,
RandomBytes,
Expand All @@ -25,10 +27,13 @@
"KeyCurveName",
"KeyExportEncryptionAlgorithm",
"KeyOperation",
"KeyRotationPolicyAction",
"KeyType",
"DeletedKey",
"KeyProperties",
"KeyReleasePolicy",
"KeyRotationLifetimeAction",
"KeyRotationPolicy",
"RandomBytes",
"ReleaseKeyResult",
]
Expand Down
73 changes: 70 additions & 3 deletions sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ._shared import KeyVaultClientBase
from ._shared.exceptions import error_map as _error_map
from ._shared._polling import DeleteRecoverPollingMethod, KeyVaultOperationPoller
from ._models import DeletedKey, KeyVaultKey, KeyProperties, RandomBytes, ReleaseKeyResult
from ._models import DeletedKey, KeyVaultKey, KeyProperties, KeyRotationPolicy, RandomBytes, ReleaseKeyResult

try:
from typing import TYPE_CHECKING
Expand All @@ -17,7 +17,7 @@

if TYPE_CHECKING:
# pylint:disable=unused-import
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union
from azure.core.paging import ItemPaged
from azure.core.polling import LROPoller
from ._models import JsonWebKey
Expand Down Expand Up @@ -45,7 +45,7 @@ class KeyClient(KeyVaultClientBase):
:dedent: 4
"""

# pylint:disable=protected-access
# pylint:disable=protected-access, too-many-public-methods

def _get_attributes(self, enabled, not_before, expires_on, exportable=None):
"""Return a KeyAttributes object if none-None attributes are provided, or None otherwise"""
Expand Down Expand Up @@ -727,3 +727,70 @@ def get_random_bytes(self, count, **kwargs):
parameters = self._models.GetRandomBytesRequest(count=count)
result = self._client.get_random_bytes(vault_base_url=self._vault_url, parameters=parameters, **kwargs)
return RandomBytes(value=result.value)

@distributed_trace
def get_key_rotation_policy(self, name, **kwargs):
# type: (str, **Any) -> KeyRotationPolicy
"""Get the rotation policy of a Key Vault key.
:param str name: The name of the key.
:return: The key rotation policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy
:raises: :class: `~azure.core.exceptions.HttpResponseError`
"""
policy = self._client.get_key_rotation_policy(vault_base_url=self._vault_url, key_name=name, **kwargs)
return KeyRotationPolicy._from_generated(policy)

@distributed_trace
def rotate_key(self, name, **kwargs):
# type: (str, **Any) -> KeyVaultKey
"""Rotate the key based on the key policy by generating a new version of the key.
This operation requires the keys/rotate permission.
:param str name: The name of the key to rotate.
:return: The new version of the rotated key.
:rtype: ~azure.keyvault.keys.KeyVaultKey
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = self._client.rotate_key(vault_base_url=self._vault_url, key_name=name, **kwargs)
return KeyVaultKey._from_key_bundle(bundle)

@distributed_trace
def update_key_rotation_policy(self, name, **kwargs):
# type: (str, **Any) -> KeyRotationPolicy
"""Updates the rotation policy of a Key Vault key.
This operation requires the keys/update permission.
:param str name: The name of the key in the given vault.
:keyword lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key.
:paramtype lifetime_actions: Iterable[~azure.keyvault.keys.KeyRotationLifetimeAction]
:keyword str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an
ISO 8601 duration. For example: 90 days is "P90D", 3 months is "P3M", and 48 hours is "PT48H".
:return: The updated rotation policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
lifetime_actions = kwargs.pop("lifetime_actions", None)
if lifetime_actions:
lifetime_actions = [
self._models.LifetimeActions(
action=self._models.LifetimeActionsType(type=action.action),
trigger=self._models.LifetimeActionsTrigger(
time_after_create=action.time_after_create, time_before_expiry=action.time_before_expiry
),
)
for action in lifetime_actions
]

attributes = self._models.KeyRotationPolicyAttributes(expiry_time=kwargs.pop("expires_in", None))
policy = self._models.KeyRotationPolicy(lifetime_actions=lifetime_actions, attributes=attributes)
result = self._client.update_key_rotation_policy(
vault_base_url=self._vault_url, key_name=name, key_rotation_policy=policy
)
return KeyRotationPolicy._from_generated(result)
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class KeyOperation(with_metaclass(CaseInsensitiveEnumMeta, str, Enum)):
export = "export"


class KeyRotationPolicyAction(with_metaclass(CaseInsensitiveEnumMeta, str, Enum)):
"""The action that will be executed in a key rotation policy"""

ROTATE = "Rotate" #: Rotate the key based on the key policy.
NOTIFY = "Notify" #: Trigger Event Grid events.


class KeyType(with_metaclass(CaseInsensitiveEnumMeta, str, Enum)):
"""Supported key types"""

Expand Down
67 changes: 66 additions & 1 deletion sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Any, Dict, Optional, List
from datetime import datetime
from ._generated.v7_0 import models as _models
from ._enums import KeyOperation, KeyType
from ._enums import KeyOperation, KeyRotationPolicyAction, KeyType

KeyOperationResult = namedtuple("KeyOperationResult", ["id", "value"])

Expand Down Expand Up @@ -279,6 +279,71 @@ def __init__(self, value):
self.value = value


class KeyRotationLifetimeAction(object):
"""An action and its corresponding trigger that will be performed by Key Vault over the lifetime of a key.
:param action: The action that will be executed.
:type action: ~azure.keyvault.keys.KeyRotationPolicyAction or str
:keyword str time_after_create: Time after creation to attempt the specified action, as an ISO 8601 duration.
For example, 90 days is "P90D".
:keyword str time_before_expiry: Time before expiry to attempt the specified action, as an ISO 8601 duration.
For example, 90 days is "P90D".
"""

def __init__(self, action, **kwargs):
# type: (KeyRotationPolicyAction, **Any) -> None
self.action = action
self.time_after_create = kwargs.get("time_after_create", None)
self.time_before_expiry = kwargs.get("time_before_expiry", None)

@classmethod
def _from_generated(cls, lifetime_action):
if lifetime_action.trigger:
return cls(
action=lifetime_action.action.type,
time_after_create=lifetime_action.trigger.time_after_create,
time_before_expiry=lifetime_action.trigger.time_before_expiry,
)
return cls(action=lifetime_action.action)


class KeyRotationPolicy(object):
"""The key rotation policy that belongs to a key.
:ivar str id: The identifier of the key rotation policy.
:ivar lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key.
:type lifetime_actions: list[~azure.keyvault.keys.KeyRotationLifetimeAction]
:ivar str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an ISO
8601 duration. For example, 90 days is "P90D".
:ivar created_on: When the policy was created, in UTC
:type created_on: ~datetime.datetime
:ivar updated_on: When the policy was last updated, in UTC
:type updated_on: ~datetime.datetime
"""

def __init__(self, policy_id, **kwargs):
# type: (str, **Any) -> None
self.id = policy_id
self.lifetime_actions = kwargs.get("lifetime_actions", None)
self.expires_in = kwargs.get("expires_in", None)
self.created_on = kwargs.get("created_on", None)
self.updated_on = kwargs.get("updated_on", None)

@classmethod
def _from_generated(cls, policy):
lifetime_actions = [KeyRotationLifetimeAction._from_generated(action) for action in policy.lifetime_actions] # pylint:disable=protected-access
if policy.attributes:
return cls(
policy_id=policy.id,
lifetime_actions=lifetime_actions,
expires_on=policy.attributes.expiry_time,
created_on=policy.attributes.created,
updated_on=policy.attributes.updated,
)
return cls(policy_id=policy.id, lifetime_actions=lifetime_actions)


class KeyVaultKey(object):
"""A key's attributes and cryptographic material.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
from .._shared._polling_async import AsyncDeleteRecoverPollingMethod
from .._shared import AsyncKeyVaultClientBase
from .._shared.exceptions import error_map as _error_map
from .. import DeletedKey, JsonWebKey, KeyProperties, KeyVaultKey, RandomBytes, ReleaseKeyResult
from .. import (
DeletedKey,
JsonWebKey,
KeyProperties,
KeyRotationPolicy,
KeyVaultKey,
RandomBytes,
ReleaseKeyResult,
)

if TYPE_CHECKING:
# pylint:disable=ungrouped-imports
from azure.core.async_paging import AsyncItemPaged
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union
from .. import KeyType


Expand All @@ -42,7 +50,7 @@ class KeyClient(AsyncKeyVaultClientBase):
:dedent: 4
"""

# pylint:disable=protected-access
# pylint:disable=protected-access, too-many-public-methods

def _get_attributes(self, enabled, not_before, expires_on, exportable=None):
"""Return a KeyAttributes object if none-None attributes are provided, or None otherwise"""
Expand Down Expand Up @@ -702,3 +710,67 @@ async def get_random_bytes(self, count: int, **kwargs: "Any") -> RandomBytes:
parameters = self._models.GetRandomBytesRequest(count=count)
result = await self._client.get_random_bytes(vault_base_url=self._vault_url, parameters=parameters, **kwargs)
return RandomBytes(value=result.value)

@distributed_trace_async
async def get_key_rotation_policy(self, name: str, **kwargs: "Any") -> "KeyRotationPolicy":
"""Get the rotation policy of a Key Vault key.
:param str name: The name of the key.
:return: The key rotation policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
policy = await self._client.get_key_rotation_policy(vault_base_url=self._vault_url, key_name=name, **kwargs)
return KeyRotationPolicy._from_generated(policy)

@distributed_trace_async
async def rotate_key(self, name: str, **kwargs: "Any") -> KeyVaultKey:
"""Rotate the key based on the key policy by generating a new version of the key.
This operation requires the keys/rotate permission.
:param str name: The name of the key to rotate.
:return: The new version of the rotated key.
:rtype: ~azure.keyvault.keys.KeyVaultKey
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = await self._client.rotate_key(vault_base_url=self._vault_url, key_name=name, **kwargs)
return KeyVaultKey._from_key_bundle(bundle)

@distributed_trace_async
async def update_key_rotation_policy(self, name: str, **kwargs: "Any") -> KeyRotationPolicy:
"""Updates the rotation policy of a Key Vault key.
This operation requires the keys/update permission.
:param str name: The name of the key in the given vault.
:keyword lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key.
:paramtype lifetime_actions: Iterable[~azure.keyvault.keys.KeyRotationLifetimeAction]
:keyword str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an
ISO 8601 duration. For example: 90 days is "P90D", 3 months is "P3M", and 48 hours is "PT48H".
:return: The updated rotation policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
lifetime_actions = kwargs.pop("lifetime_actions", None)
if lifetime_actions:
lifetime_actions = [
self._models.LifetimeActions(
action=self._models.LifetimeActionsType(type=action.action),
trigger=self._models.LifetimeActionsTrigger(
time_after_create=action.time_after_create, time_before_expiry=action.time_before_expiry
),
)
for action in lifetime_actions
]

attributes = self._models.KeyRotationPolicyAttributes(expiry_time=kwargs.pop("expires_in", None))
policy = self._models.KeyRotationPolicy(lifetime_actions=lifetime_actions, attributes=attributes)
result = await self._client.update_key_rotation_policy(
vault_base_url=self._vault_url, key_name=name, key_rotation_policy=policy
)
return KeyRotationPolicy._from_generated(result)
Loading

0 comments on commit f463e0b

Please sign in to comment.