Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Harness): support user secrets #1176

Merged
merged 17 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 85 additions & 4 deletions ops/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,54 @@ def add_model_secret(self, owner: AppUnitOrName, content: Dict[str, str]) -> str
model.Secret._validate_content(content)
return self._backend._secret_add(content, owner_name)

def add_user_secret(self, content: Dict[str, str]) -> str:
"""Add a secret owned by the user, simulating the ``juju add-secret`` command.

Args:
content: A key-value mapping containing the payload of the secret,
for example :code:`{"password": "foo123"}`.

Return:
The ID of the newly-added secret.

Example usage (the parameter ``harness`` in the test function is
a pytest fixture that does setup/teardown, see :class:`Harness`)::

# charmcraft.yaml
config:
options:
mysec:
type: secret
description: "tell me your secrets"

# charm.py
class MyVMCharm(ops.CharmBase):
def __init__(self, framework: ops.Framework):
super().__init__(framework)
framework.observe(self.on.config_changed, self._on_config_changed)

def _on_config_changed(self, event: ops.ConfigChangedEvent):
mysec = self.config.get('mysec')
if mysec:
sec = self.model.get_secret(id=mysec, label="mysec")
self.config_from_secret = sec.get_content()

# test_charm.py
def test_config_changed(harness):
secret_content = {'password': 'foo'}
secret_id = harness.add_user_secret(secret_content)
harness.grant_secret(secret_id, 'test-charm')
harness.begin()
harness.update_config({'mysec': secret_id})
secret = harness.model.get_secret(id=secret_id).get_content()
assert harness.charm.config_from_secret == secret.get_content()

"""
model.Secret._validate_content(content)
# Although it's named a user-owned secret in Juju, technically, the owner is the
# Model, so the secret's owner is set to `Model.uuid`.
return self._backend._secret_add(content, self.model.uuid)

def _ensure_secret(self, secret_id: str) -> '_Secret':
secret = self._backend._get_secret(secret_id)
if secret is None:
Expand Down Expand Up @@ -1566,6 +1614,9 @@ def set_secret_content(self, secret_id: str, content: Dict[str, str]):
def grant_secret(self, secret_id: str, observer: AppUnitOrName):
"""Grant read access to this secret for the given observer application or unit.

For user secrets, grant access to the application, simulating the
``juju grant-secret`` command.

If the given application or unit has already been granted access to
this secret, do nothing.

Expand All @@ -1577,10 +1628,17 @@ def grant_secret(self, secret_id: str, observer: AppUnitOrName):
under test must already have been created.
"""
secret = self._ensure_secret(secret_id)
app_or_unit_name = _get_app_or_unit_name(observer)

# User secrets:
if secret.owner_name == self.model.uuid:
secret.user_secrets_grants.add(app_or_unit_name)
IronCore864 marked this conversation as resolved.
Show resolved Hide resolved
return

# Model secrets:
if secret.owner_name in [self.model.app.name, self.model.unit.name]:
raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, "'
f"can't call grant_secret")
app_or_unit_name = _get_app_or_unit_name(observer)
relation_id = self._secret_relation_id_to(secret)
if relation_id not in secret.grants:
secret.grants[relation_id] = set()
Expand All @@ -1600,10 +1658,18 @@ def revoke_secret(self, secret_id: str, observer: AppUnitOrName):
test must have already been created.
"""
secret = self._ensure_secret(secret_id)
app_or_unit_name = _get_app_or_unit_name(observer)

# User secrets:
if secret.owner_name == self.model.uuid:
secret.user_secrets_grants.discard(app_or_unit_name)
return

# Model secrets:
if secret.owner_name in [self.model.app.name, self.model.unit.name]:
raise RuntimeError(f'Secret {secret_id!r} owned by the charm under test, "'
f"can't call revoke_secret")
app_or_unit_name = _get_app_or_unit_name(observer)

relation_id = self._secret_relation_id_to(secret)
if relation_id not in secret.grants:
return
Expand Down Expand Up @@ -1650,6 +1716,8 @@ def trigger_secret_rotation(self, secret_id: str, *, label: Optional[str] = None
label is used.
"""
secret = self._ensure_secret(secret_id)
if secret.owner_name == self.model.uuid:
raise RuntimeError("Cannot trigger the secret-rotate event for a user secret.")
if label is None:
label = secret.label
self.charm.on.secret_rotate.emit(secret_id, label)
Expand Down Expand Up @@ -1690,6 +1758,8 @@ def trigger_secret_expiration(self, secret_id: str, revision: int, *,
label is used.
"""
secret = self._ensure_secret(secret_id)
if secret.owner_name == self.model.uuid:
raise RuntimeError("Cannot trigger the secret-expired event for a user secret.")
if label is None:
label = secret.label
self.charm.on.secret_expired.emit(secret_id, label, revision)
Expand Down Expand Up @@ -2113,6 +2183,7 @@ class _Secret:
description: Optional[str] = None
tracked: int = 1
grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict)
user_secrets_grants: Set[str] = dataclasses.field(default_factory=set)


@_copy_docstrings(model._ModelBackend)
Expand Down Expand Up @@ -2535,8 +2606,14 @@ def secret_get(self, *,
peek: bool = False) -> Dict[str, str]:
secret = self._ensure_secret_id_or_label(id, label)

# Check that caller has permission to get this secret
if secret.owner_name not in [self.app_name, self.unit_name]:
if secret.owner_name == self.model_uuid:
# This is a user secret - charms only ever have view access.
if self.app_name not in secret.user_secrets_grants:
raise model.SecretNotFoundError(
f'Secret {id!r} not granted access to {self.app_name!r}')
elif secret.owner_name not in [self.app_name, self.unit_name]:
# This is a model secret - the model might have admin or view access.
# Check that caller has permission to get this secret
# Observer is calling: does secret have a grant on relation between
# this charm (the observer) and the secret owner's app?
owner_app = secret.owner_name.split('/')[0]
Expand Down Expand Up @@ -2572,6 +2649,10 @@ def _ensure_secret_owner(self, secret: _Secret):
# secrets, the leader has manage permissions and other units only have
# view permissions.
# https://discourse.charmhub.io/t/secret-access-permissions/12627
# For user secrets the secret owner is the model, that is,
# `secret.owner_name == self.model.uuid`, only model admins have
# manage permissions: https://juju.is/docs/juju/secret.

unit_secret = secret.owner_name == self.unit_name
app_secret = secret.owner_name == self.app_name

Expand Down
90 changes: 88 additions & 2 deletions test/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,8 +1103,7 @@ def test_config_secret_option(self):
''')
self.addCleanup(harness.cleanup)
harness.begin()
# [jam] I don't think this is right, as user-secrets aren't owned by the app
secret_id = harness.add_model_secret('mycharm', {'key': 'value'})
secret_id = harness.add_user_secret({'key': 'value'})
harness.update_config(key_values={'a': secret_id})
self.assertEqual(harness.charm.changes,
[{'name': 'config-changed', 'data': {'a': secret_id}}])
Expand Down Expand Up @@ -5100,6 +5099,17 @@ def test_trigger_secret_rotation(self):
with self.assertRaises(RuntimeError):
harness.trigger_secret_rotation('nosecret')

def test_trigger_secret_rotation_on_user_secret(self):
harness = ops.testing.Harness(EventRecorder, meta='name: database')
self.addCleanup(harness.cleanup)

secret_id = harness.add_user_secret({'foo': 'bar'})
assert secret_id is not None
harness.begin()

with self.assertRaises(RuntimeError):
harness.trigger_secret_rotation(secret_id)

def test_trigger_secret_removal(self):
harness = ops.testing.Harness(EventRecorder, meta='name: database')
self.addCleanup(harness.cleanup)
Expand Down Expand Up @@ -5156,6 +5166,17 @@ def test_trigger_secret_expiration(self):
with self.assertRaises(RuntimeError):
harness.trigger_secret_removal('nosecret', 1)

def test_trigger_secret_expiration_on_user_secret(self):
harness = ops.testing.Harness(EventRecorder, meta='name: database')
self.addCleanup(harness.cleanup)

secret_id = harness.add_user_secret({'foo': 'bar'})
assert secret_id is not None
harness.begin()

with self.assertRaises(RuntimeError):
harness.trigger_secret_expiration(secret_id, 1)

def test_secret_permissions_unit(self):
harness = ops.testing.Harness(ops.CharmBase, meta='name: database')
self.addCleanup(harness.cleanup)
Expand Down Expand Up @@ -5202,6 +5223,71 @@ def test_secret_permissions_nonleader(self):
with self.assertRaises(ops.model.SecretNotFoundError):
secret.remove_all_revisions()

def test_add_user_secret(self):
harness = ops.testing.Harness(ops.CharmBase, meta=yaml.safe_dump(
{'name': 'webapp'}
))
self.addCleanup(harness.cleanup)
harness.begin()

secret_content = {'password': 'foo'}
secret_id = harness.add_user_secret(secret_content)
harness.grant_secret(secret_id, 'webapp')

secret = harness.model.get_secret(id=secret_id)
self.assertEqual(secret.id, secret_id)
self.assertEqual(secret.get_content(), secret_content)

def test_get_user_secret_without_grant(self):
harness = ops.testing.Harness(ops.CharmBase, meta=yaml.safe_dump(
{'name': 'webapp'}
))
self.addCleanup(harness.cleanup)
harness.begin()
secret_id = harness.add_user_secret({'password': 'foo'})
with self.assertRaises(ops.SecretNotFoundError):
harness.model.get_secret(id=secret_id)

def test_revoke_user_secret(self):
harness = ops.testing.Harness(ops.CharmBase, meta=yaml.safe_dump(
{'name': 'webapp'}
))
self.addCleanup(harness.cleanup)
harness.begin()

secret_content = {'password': 'foo'}
secret_id = harness.add_user_secret(secret_content)
harness.grant_secret(secret_id, 'webapp')
harness.revoke_secret(secret_id, 'webapp')
with self.assertRaises(ops.SecretNotFoundError):
harness.model.get_secret(id=secret_id)

def test_set_user_secret_content(self):
harness = ops.testing.Harness(EventRecorder, meta=yaml.safe_dump(
{'name': 'webapp'}
))
self.addCleanup(harness.cleanup)
harness.begin()
secret_id = harness.add_user_secret({'password': 'foo'})
harness.grant_secret(secret_id, 'webapp')
secret = harness.model.get_secret(id=secret_id)
self.assertEqual(secret.get_content(), {'password': 'foo'})
harness.set_secret_content(secret_id, {'password': 'bar'})
secret = harness.model.get_secret(id=secret_id)
self.assertEqual(secret.get_content(refresh=True), {'password': 'bar'})

def test_get_user_secret_info(self):
harness = ops.testing.Harness(EventRecorder, meta=yaml.safe_dump(
{'name': 'webapp'}
))
self.addCleanup(harness.cleanup)
harness.begin()
secret_id = harness.add_user_secret({'password': 'foo'})
harness.grant_secret(secret_id, 'webapp')
secret = harness.model.get_secret(id=secret_id)
with self.assertRaises(ops.SecretNotFoundError):
secret.get_info()


class EventRecorder(ops.CharmBase):
def __init__(self, framework: ops.Framework):
Expand Down
Loading