From 261bdd120688db10e4f58d96ff0d976005822e85 Mon Sep 17 00:00:00 2001 From: Ben Hoyt Date: Fri, 2 Jun 2023 17:21:26 +1200 Subject: [PATCH 1/5] Fix sphinx-build warnings and update some types accordingly --- .readthedocs.yaml | 3 +- docs/conf.py | 47 ++++++++-- ops/__init__.py | 4 +- ops/charm.py | 192 +++++++++++++--------------------------- ops/framework.py | 124 ++++++++++++-------------- ops/main.py | 4 +- ops/model.py | 219 ++++++++++++++++++++++------------------------ ops/pebble.py | 146 ++++++++++++++++--------------- ops/testing.py | 23 +++-- tox.ini | 2 +- 10 files changed, 354 insertions(+), 410 deletions(-) diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 9ff727bc6..f18916664 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -10,4 +10,5 @@ build: python: "3.11" sphinx: - configuration: docs/conf.py + configuration: docs/conf.py + fail_on_warning: true diff --git a/docs/conf.py b/docs/conf.py index 1d48f0387..0c5d5a1bb 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,6 +11,20 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) +# Pull in fix from https://github.com/sphinx-doc/sphinx/pull/11222/files to fix +# "invalid signature for autoattribute ('ops.pebble::ServiceDict.backoff-delay')" +import re +import sphinx.ext.autodoc +sphinx.ext.autodoc.py_ext_sig_re = re.compile( + r'''^ ([\w.]+::)? # explicit module name + ([\w.]+\.)? # module and/or class name(s) + ([^.()]+) \s* # thing name + (?: \((.*)\) # optional: arguments + (?:\s* -> \s* (.*))? # return annotation + )? $ # and nothing more + ''', re.VERBOSE) + + # -- Project information ----------------------------------------------------- project = 'The Operator Framework' @@ -29,12 +43,33 @@ # domain name if present. Example entries would be ('py:func', 'int') or # ('envvar', 'LD_LIBRARY_PATH'). nitpick_ignore = [ - ('py:class', 'TextIO'), # typing.TextIO confuses the nitpicker - ('py:class', 'method'), # types.Method confuses the nitpicker - ('py:class', '_ModelBackend'), # private - ('py:class', '_ModelCache'), # private - ('py:class', 'ipaddress.ip_address'), # fake (AFAIK there is no ABC) - ('py:class', 'ipaddress.ip_network'), # ditto + ('py:class', 'ops.model._ModelBackend'), + ('py:class', 'ops.model._ModelCache'), + ('py:class', '_AddressDict'), + ('py:class', '_NetworkDict'), + ('py:class', '_RelationMetaDict'), + ('py:class', '_ResourceMetaDict'), + ('py:class', '_StorageMetaDict'), + ('py:class', '_ChangeData'), + ('py:class', '_ChangeDict'), + ('py:class', '_InfoDict'), + ('py:class', '_IOSource'), + ('py:class', '_TextOrBinaryIO'), + ('py:class', '_Readable'), + ('py:class', '_Writeable'), + ('py:class', '_WebSocket'), + ('py:class', '_FileInfoDict'), + ('py:class', '_PlanDict'), + ('py:class', '_ServiceInfoDict'), + ('py:class', '_SystemInfoDict'), + ('py:class', '_TaskData'), + ('py:class', '_TaskDict'), + ('py:class', '_ProgressDict'), + ('py:class', '_WarningDict'), + ('py:class', 'ops.storage.SQLiteStorage'), + ('py:class', 'ops.storage.JujuStorage'), + ('py:class', 'ops.testing.CharmType'), + ('py:obj', 'ops.testing.CharmType'), ] # Add any Sphinx extension module names here, as strings. They can be diff --git a/ops/__init__.py b/ops/__init__.py index 064bcaf62..6bcb65805 100644 --- a/ops/__init__.py +++ b/ops/__init__.py @@ -105,6 +105,7 @@ 'ObjectEvents', 'PreCommitEvent', 'PrefixedEvents', + 'Serializable', 'StoredDict', 'StoredList', 'StoredSet', @@ -174,7 +175,7 @@ # This allows "import ops; ops.main(Charm)" to work as expected. from . import main # type: ignore # noqa: F401 -# Explicitly import names from sub-modules so users can just "import ops" and +# Explicitly import names from submodules so users can just "import ops" and # then use them as "ops.X". from .charm import ( # noqa: F401 ActionEvent, @@ -236,6 +237,7 @@ ObjectEvents, PreCommitEvent, PrefixedEvents, + Serializable, StoredDict, StoredList, StoredSet, diff --git a/ops/charm.py b/ops/charm.py index 6698219c8..fc2d4d636 100755 --- a/ops/charm.py +++ b/ops/charm.py @@ -36,18 +36,8 @@ if TYPE_CHECKING: from typing_extensions import Literal, Required, TypedDict - from ops.framework import Handle, JsonObject, _SerializedData - from ops.model import Container, Numerical, Relation, Storage - - # CharmMeta also needs these. - _ActionParam = Dict[str, 'JsonObject'] # - _ActionMetaDict = TypedDict( - '_ActionMetaDict', { - 'title': str, - 'description': str, - 'params': Dict[str, _ActionParam], - 'required': List[str]}, - total=False) + from ops.framework import Handle + from ops.model import Container, Relation, Storage _Scopes = Literal['global', 'container'] _RelationMetaDict = TypedDict( @@ -76,55 +66,10 @@ 'description': str}, total=False) - _PayloadMetaDict = TypedDict('_PayloadMetaDict', {'type': str}) - _MountDict = TypedDict( '_MountDict', {'storage': Required[str], 'location': str}, total=False) - _ContainerMetaDict = TypedDict( - '_ContainerMetaDict', {'mounts': List[_MountDict]}) - - _CharmMetaDict = TypedDict( - '_CharmMetaDict', { # all are optional - 'name': Required[str], - 'summary': Required[str], - 'description': Required[str], - 'maintainer': str, - 'maintainers': List[str], - 'tags': List[str], - 'terms': List[str], - 'series': List[str], - 'subordinate': bool, - 'min-juju-version': str, - 'requires': Dict[str, '_RelationMetaDict'], - 'provides': Dict[str, '_RelationMetaDict'], - 'peers': Dict[str, '_RelationMetaDict'], - 'storage': Dict[str, '_StorageMetaDict'], - 'resources': Dict[str, '_ResourceMetaDict'], - 'payloads': Dict[str, '_PayloadMetaDict'], - 'extra-bindings': Dict[str, Any], # fixme: _BindingDict? - 'containers': Dict[str, '_ContainerMetaDict'] - }, total=False) - - # can't put in *Event because *Event.snapshot needs it. - _WorkloadEventSnapshot = TypedDict('_WorkloadEventSnapshot', { - 'container_name': str - }, total=False) - - _RelationDepartedEventSnapshot = TypedDict('_RelationDepartedEventSnapshot', { - 'relation_name': str, - 'relation_id': int, - 'app_name': Optional[str], - 'unit_name': Optional[str], - 'departing_unit': Optional[str] - }, total=False) - - _StorageEventSnapshot = TypedDict('_StorageEventSnapshot', { - 'storage_name': str, - 'storage_index': int, - 'storage_location': str, - }, total=False) class HookEvent(EventBase): @@ -171,7 +116,7 @@ def defer(self): """ raise RuntimeError('cannot defer action events') - def restore(self, snapshot: 'JsonObject'): + def restore(self, snapshot: Dict[str, Any]): """Used by the operator framework to record the action. Not meant to be called directly by charm code. @@ -186,7 +131,7 @@ def restore(self, snapshot: 'JsonObject'): # the model is not available in __init__. self.params = self.framework.model._backend.action_get() - def set_results(self, results: '_SerializedData'): + def set_results(self, results: Dict[str, Any]): """Report the result of the action. Juju eventually only accepts a str:str mapping, so we will attempt @@ -397,7 +342,7 @@ class CollectMetricsEvent(HookEvent): how they can interact with Juju. """ - def add_metrics(self, metrics: Mapping[str, 'Numerical'], + def add_metrics(self, metrics: Mapping[str, Union[int, float]], labels: Optional[Mapping[str, str]] = None): """Record metrics that have been gathered by the charm for this unit. @@ -429,13 +374,6 @@ class RelationEvent(HookEvent): :class:`~ops.model.Application` level event """ - if TYPE_CHECKING: - _RelationEventSnapshot = TypedDict('_RelationEventSnapshot', { - 'relation_name': Required[str], - 'relation_id': Required[int], - 'app_name': Optional[str], - 'unit_name': Optional[str] - }, total=False) def __init__(self, handle: 'Handle', relation: 'Relation', app: Optional[model.Application] = None, @@ -450,12 +388,12 @@ def __init__(self, handle: 'Handle', relation: 'Relation', self.app = app self.unit = unit - def snapshot(self) -> '_RelationEventSnapshot': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. """ - snapshot: 'RelationEvent._RelationEventSnapshot' = { + snapshot: Dict[str, Any] = { 'relation_name': self.relation.name, 'relation_id': self.relation.id, } @@ -465,7 +403,7 @@ def snapshot(self) -> '_RelationEventSnapshot': snapshot['unit_name'] = self.unit.name return snapshot - def restore(self, snapshot: '_RelationEventSnapshot'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. @@ -560,12 +498,12 @@ def __init__(self, handle: 'Handle', relation: 'Relation', self._departing_unit_name = departing_unit_name - def snapshot(self) -> '_RelationDepartedEventSnapshot': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. """ - snapshot = cast('_RelationDepartedEventSnapshot', super().snapshot()) + snapshot = super().snapshot() if self._departing_unit_name: snapshot['departing_unit'] = self._departing_unit_name return snapshot @@ -583,13 +521,12 @@ def departing_unit(self) -> Optional[model.Unit]: return None return self.framework.model.get_unit(self._departing_unit_name) - def restore(self, snapshot: '_RelationDepartedEventSnapshot'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. """ - super().restore(snapshot) # type: ignore - + super().restore(snapshot) self._departing_unit_name = snapshot.get('departing_unit') @@ -625,19 +562,19 @@ def __init__(self, handle: 'Handle', storage: 'Storage'): super().__init__(handle) self.storage = storage - def snapshot(self) -> '_StorageEventSnapshot': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. """ - snapshot: '_StorageEventSnapshot' = {} + snapshot: Dict[str, Any] = {} if isinstance(self.storage, model.Storage): snapshot["storage_name"] = self.storage.name snapshot["storage_index"] = self.storage.index snapshot["storage_location"] = str(self.storage.location) return snapshot - def restore(self, snapshot: '_StorageEventSnapshot'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. @@ -711,17 +648,17 @@ def __init__(self, handle: 'Handle', workload: 'Container'): self.workload = workload - def snapshot(self) -> '_WorkloadEventSnapshot': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. """ - snapshot: "_WorkloadEventSnapshot" = {} + snapshot: Dict[str, Any] = {} if isinstance(self.workload, model.Container): snapshot['container_name'] = self.workload.name return snapshot - def restore(self, snapshot: '_WorkloadEventSnapshot'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. @@ -760,14 +697,14 @@ def secret(self) -> model.Secret: backend = self.framework.model._backend return model.Secret(backend=backend, id=self._id, label=self._label) - def snapshot(self) -> '_SerializedData': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. """ return {'id': self._id, 'label': self._label} - def restore(self, snapshot: '_SerializedData'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. @@ -784,7 +721,7 @@ class SecretChangedEvent(SecretEvent): secret will be notified via this event that a new revision is available. Typically, you will want to fetch the new content by calling - :meth:`ops.model.Secret.get_content` with :code:`refresh=True` to tell Juju to + :meth:`ops.Secret.get_content` with :code:`refresh=True` to tell Juju to start tracking the new revision. """ @@ -794,7 +731,7 @@ class SecretRotateEvent(SecretEvent): This event is fired on the secret owner to inform it that the secret must be rotated. The event will keep firing until the owner creates a new - revision by calling :meth:`ops.model.Secret.set_content`. + revision by calling :meth:`ops.Secret.set_content`. """ def defer(self): @@ -811,7 +748,7 @@ class SecretRemoveEvent(SecretEvent): observers have updated to that new revision, this event will be fired to inform the secret owner that the old revision can be removed. - Typically, you will want to call :meth:`ops.model.Secret.remove_revision` to + Typically, you will want to call :meth:`ops.Secret.remove_revision` to remove the now-unused revision. """ @@ -824,7 +761,7 @@ def revision(self) -> int: """The secret revision this event refers to.""" return self._revision - def snapshot(self) -> '_SerializedData': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. @@ -833,7 +770,7 @@ def snapshot(self) -> '_SerializedData': data['revision'] = self._revision return data - def restore(self, snapshot: '_SerializedData'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. @@ -847,7 +784,7 @@ class SecretExpiredEvent(SecretEvent): This event is fired on the secret owner to inform it that the secret revision must be removed. The event will keep firing until the owner removes the - revision by calling :meth:`model.Secret.remove_revision()`. + revision by calling :meth:`ops.Secret.remove_revision()`. """ def __init__(self, handle: 'Handle', id: str, label: Optional[str], revision: int): @@ -859,7 +796,7 @@ def revision(self) -> int: """The secret revision this event refers to.""" return self._revision - def snapshot(self) -> '_SerializedData': + def snapshot(self) -> Dict[str, Any]: """Used by the framework to serialize the event to disk. Not meant to be called by charm code. @@ -868,7 +805,7 @@ def snapshot(self) -> '_SerializedData': data['revision'] = self._revision return data - def restore(self, snapshot: '_SerializedData'): + def restore(self, snapshot: Dict[str, Any]): """Used by the framework to deserialize the event from disk. Not meant to be called by charm code. @@ -1079,58 +1016,53 @@ class is mostly for the framework to understand what the charm has defined. actions_raw: a mapping containing the contents of actions.yaml """ - if TYPE_CHECKING: - # avoid long line in init - _ActionsRaw = Optional[Dict[str, '_ActionMetaDict']] - - def __init__(self, - raw: Optional['_CharmMetaDict'] = None, # type: ignore - actions_raw: '_ActionsRaw' = None # type: ignore - ): - raw: _CharmMetaDict = raw or cast('_CharmMetaDict', {}) - actions_raw: Dict[str, _ActionMetaDict] = actions_raw or {} - - self.name = raw.get('name', '') - self.summary = raw.get('summary', '') - self.description = raw.get('description', '') + + def __init__(self, raw: Optional[Dict[str, Any]] = None, + actions_raw: Optional[Dict[str, Any]] = None): + raw_: Dict[str, Any] = raw or {} + actions_raw_: Dict[str, Any] = actions_raw or {} + + self.name = raw_.get('name', '') + self.summary = raw_.get('summary', '') + self.description = raw_.get('description', '') self.maintainers: List[str] = [] - if 'maintainer' in raw: - self.maintainers.append(raw['maintainer']) - if 'maintainers' in raw: - self.maintainers.extend(raw['maintainers']) - self.tags = raw.get('tags', []) - self.terms = raw.get('terms', []) - self.series = raw.get('series', []) - self.subordinate = raw.get('subordinate', False) - self.min_juju_version = raw.get('min-juju-version') + if 'maintainer' in raw_: + self.maintainers.append(raw_['maintainer']) + if 'maintainers' in raw_: + self.maintainers.extend(raw_['maintainers']) + self.tags = raw_.get('tags', []) + self.terms = raw_.get('terms', []) + self.series = raw_.get('series', []) + self.subordinate = raw_.get('subordinate', False) + self.min_juju_version = raw_.get('min-juju-version') self.requires = {name: RelationMeta(RelationRole.requires, name, rel) - for name, rel in raw.get('requires', {}).items()} + for name, rel in raw_.get('requires', {}).items()} self.provides = {name: RelationMeta(RelationRole.provides, name, rel) - for name, rel in raw.get('provides', {}).items()} + for name, rel in raw_.get('provides', {}).items()} self.peers = {name: RelationMeta(RelationRole.peer, name, rel) - for name, rel in raw.get('peers', {}).items()} + for name, rel in raw_.get('peers', {}).items()} self.relations: Dict[str, RelationMeta] = {} self.relations.update(self.requires) self.relations.update(self.provides) self.relations.update(self.peers) self.storages = {name: StorageMeta(name, storage) - for name, storage in raw.get('storage', {}).items()} + for name, storage in raw_.get('storage', {}).items()} self.resources = {name: ResourceMeta(name, res) - for name, res in raw.get('resources', {}).items()} + for name, res in raw_.get('resources', {}).items()} self.payloads = {name: PayloadMeta(name, payload) - for name, payload in raw.get('payloads', {}).items()} - self.extra_bindings = raw.get('extra-bindings', {}) - self.actions = {name: ActionMeta(name, action) for name, action in actions_raw.items()} + for name, payload in raw_.get('payloads', {}).items()} + self.extra_bindings = raw_.get('extra-bindings', {}) + self.actions = {name: ActionMeta(name, action) for name, action in actions_raw_.items()} # This is taken from Charm Metadata v2, but only the "containers" and # "containers.name" fields that we need right now for Pebble. See: # https://discourse.charmhub.io/t/charm-metadata-v2/3674 self.containers = {name: ContainerMeta(name, container) - for name, container in raw.get('containers', {}).items()} + for name, container in raw_.get('containers', {}).items()} @classmethod def from_yaml( cls, metadata: Union[str, TextIO], - actions: Optional[Union[str, TextIO]] = None): + actions: Optional[Union[str, TextIO]] = None) -> 'CharmMeta': """Instantiate a CharmMeta from a YAML description of metadata.yaml. Args: @@ -1138,10 +1070,10 @@ def from_yaml( This can be a simple string, or a file-like object. (passed to `yaml.safe_load`). actions: YAML description of Actions for this charm (eg actions.yaml) """ - meta = cast('_CharmMetaDict', yaml.safe_load(metadata)) + meta = yaml.safe_load(metadata) raw_actions = {} if actions is not None: - raw_actions = cast(Dict[str, '_ActionMetaDict'], yaml.safe_load(actions)) + raw_actions = cast(Dict[str, Any], yaml.safe_load(actions)) if raw_actions is None: raw_actions = {} return cls(meta, raw_actions) @@ -1210,7 +1142,7 @@ class StorageMeta: storage_name: Name of storage type: Storage type description: A text description of the storage - read_only: Whether or not the storage is read only + read_only: Whether the storage is read only minimum_size: Minimum size of storage location: Mount point of storage multiple_range: Range of numeric qualifiers when multiple storage units are used @@ -1258,7 +1190,7 @@ class PayloadMeta: type: Payload type """ - def __init__(self, name: str, raw: '_PayloadMetaDict'): + def __init__(self, name: str, raw: Dict[str, Any]): self.payload_name = name self.type = raw['type'] @@ -1266,7 +1198,7 @@ def __init__(self, name: str, raw: '_PayloadMetaDict'): class ActionMeta: """Object containing metadata about an action's definition.""" - def __init__(self, name: str, raw: Optional['_ActionMetaDict'] = None): + def __init__(self, name: str, raw: Optional[Dict[str, Any]] = None): raw = raw or {} self.name = name self.title = raw.get('title', '') @@ -1285,7 +1217,7 @@ class ContainerMeta: name: Name of container (key in the YAML) """ - def __init__(self, name: str, raw: '_ContainerMetaDict'): + def __init__(self, name: str, raw: Dict[str, Any]): self.name = name self._mounts: Dict[str, ContainerStorageMeta] = {} diff --git a/ops/framework.py b/ops/framework.py index 6057f3558..0068ac07a 100755 --- a/ops/framework.py +++ b/ops/framework.py @@ -34,13 +34,13 @@ Any, Callable, Dict, - Generic, Hashable, Iterable, List, Optional, Set, Tuple, + Type, TypeVar, Union, ) @@ -48,50 +48,37 @@ from ops import charm from ops.storage import JujuStorage, NoSnapshotError, SQLiteStorage -if TYPE_CHECKING: - from pathlib import Path - from typing import Literal, Protocol, Type - from ops.charm import CharmMeta - from ops.model import JsonObject, Model, _ModelBackend +class Serializable(typing.Protocol): + """The type returned by :meth:`Framework.load_snapshot`.""" - class _Serializable(Protocol): - handle_kind = '' - @property - def handle(self) -> 'Handle': ... # noqa - @handle.setter - def handle(self, val: 'Handle'): ... # noqa - def snapshot(self) -> Dict[str, '_StorableType']: ... # noqa - def restore(self, snapshot: Dict[str, '_StorableType']) -> None: ... # noqa + handle_kind = '' - class _StoredObject(Protocol): - _under: Any = None # noqa + @property + def handle(self) -> 'Handle': ... # noqa + @handle.setter + def handle(self, val: 'Handle'): ... # noqa + def snapshot(self) -> Dict[str, Any]: ... # noqa + def restore(self, snapshot: Dict[str, Any]) -> None: ... # noqa - # serialized data structure - _SerializedData = Dict[str, 'JsonObject'] - _ObserverCallback = Callable[[Any], None] +if TYPE_CHECKING: + from typing import Literal, Protocol - # types that can be stored natively - _StorableType = Union[int, bool, float, str, bytes, Literal[None], - List['_StorableType'], - Dict[str, '_StorableType'], - Set['_StorableType']] + from ops.charm import CharmMeta + from ops.model import Model, _ModelBackend - StoredObject = Union['StoredList', 'StoredSet', 'StoredDict'] + class _StoredObject(Protocol): + _under: Any = None # noqa - # This type is used to denote either a Handle instance or an instance of - # an Object (or subclass). This is used by methods and classes which can be - # called with either of those (they need a Handle, but will accept an Object - # from which they will then extract the Handle). - _ParentHandle = Union['Handle', 'Object'] + StoredObject = Union['StoredList', 'StoredSet', 'StoredDict'] _Path = _Kind = _MethodName = _EventKey = str # used to type Framework Attributes _ObserverPath = List[Tuple[_Path, _MethodName, _Path, _EventKey]] _ObjectPath = Tuple[Optional[_Path], _Kind] _PathToObjectMapping = Dict[_Path, 'Object'] - _PathToSerializableMapping = Dict[_Path, _Serializable] + _PathToSerializableMapping = Dict[_Path, Serializable] _T = TypeVar("_T") _EventType = TypeVar('_EventType', bound='EventBase') @@ -258,14 +245,14 @@ def defer(self): logger.debug("Deferring %s.", self) self.deferred = True - def snapshot(self) -> '_SerializedData': + def snapshot(self) -> Dict[str, Any]: """Return the snapshot data that should be persisted. Subclasses must override to save any custom state. """ return {} - def restore(self, snapshot: '_SerializedData'): + def restore(self, snapshot: Dict[str, Any]): """Restore the value state from the given snapshot. Subclasses must override to restore their custom state. @@ -273,7 +260,7 @@ def restore(self, snapshot: '_SerializedData'): self.deferred = False -class EventSource(Generic[_EventType]): +class EventSource: """EventSource wraps an event type with a descriptor to facilitate observing and emitting. It is generally used as: @@ -288,11 +275,11 @@ class SomeObject(Object): attribute which is a BoundEvent and may be used to emit and observe the event. """ - def __init__(self, event_type: 'Type[_EventType]'): + def __init__(self, event_type: 'Type[EventBase]'): if not isinstance(event_type, type) or not issubclass(event_type, EventBase): raise RuntimeError( f'Event requires a subclass of EventBase as an argument, got {event_type}') - self.event_type: Type[_EventType] = event_type + self.event_type: Type[EventBase] = event_type self.event_kind: Optional[str] = None self.emitter_type: Optional[Type[Object]] = None @@ -312,7 +299,7 @@ def __set_name__(self, emitter_type: 'Type[Object]', event_kind: str): def __get__(self, emitter: Optional['Object'], emitter_type: 'Type[Object]' - ) -> 'BoundEvent[_EventType]': + ) -> 'BoundEvent': if emitter is None: return self # type: ignore # Framework might not be available if accessed as CharmClass.on.event @@ -324,7 +311,7 @@ def __get__(self, emitter: Optional['Object'], return BoundEvent(emitter, self.event_type, self.event_kind) -class BoundEvent(Generic[_EventType]): +class BoundEvent: """Event bound to an Object.""" def __repr__(self): @@ -489,7 +476,7 @@ def _event_kinds(self) -> List[str]: event_kinds.append(attr_name) return event_kinds - def events(self) -> Dict[str, EventSource[EventBase]]: + def events(self) -> Dict[str, EventSource]: """Return a mapping of event_kinds to bound_events for all available events.""" return {event_kind: getattr(self, event_kind) for event_kind in self._event_kinds()} @@ -509,7 +496,7 @@ def __init__(self, emitter: Object, key: str): self._emitter = emitter self._prefix = key.replace('-', '_') + '_' - def __getattr__(self, name: str) -> BoundEvent[Any]: + def __getattr__(self, name: str) -> BoundEvent: return getattr(self._emitter, self._prefix + name) @@ -561,7 +548,7 @@ class Framework(Object): # Override properties from Object so that we can set them in __init__. model: 'Model' = None meta: 'CharmMeta' = None - charm_dir: 'Path' = None + charm_dir: 'pathlib.Path' = None # to help the type checker and IDEs: @@ -599,8 +586,8 @@ def __init__(self, storage: Union[SQLiteStorage, JujuStorage], # {(parent_path, kind): cls} # (parent_path, kind) is the address of _this_ object: the parent path # plus a 'kind' string that is the name of this object. - self._type_registry: Dict[_ObjectPath, Type[_Serializable]] = {} - self._type_known: Set[Type[_Serializable]] = set() + self._type_registry: Dict[_ObjectPath, Type[Serializable]] = {} + self._type_known: Set[Type[Serializable]] = set() if isinstance(storage, (str, pathlib.Path)): logger.warning( @@ -652,7 +639,7 @@ def close(self): """Close the underlying backends.""" self._storage.close() - def _track(self, obj: '_Serializable'): + def _track(self, obj: 'Serializable'): """Track object and ensure it is the only object created using its handle path.""" if obj is self: # Framework objects don't track themselves @@ -662,7 +649,7 @@ def _track(self, obj: '_Serializable'): f'two objects claiming to be {obj.handle.path} have been created') self._objects[obj.handle.path] = obj - def _forget(self, obj: '_Serializable'): + def _forget(self, obj: 'Serializable'): """Stop tracking the given object. See also _track.""" self._objects.pop(obj.handle.path, None) @@ -677,7 +664,7 @@ def commit(self): self.save_snapshot(self._stored) self._storage.commit() - def register_type(self, cls: 'Type[_Serializable]', parent: Optional['_ParentHandle'], + def register_type(self, cls: Type[Serializable], parent: Optional[Union['Handle', 'Object']], kind: str = None): """Register a type to a handle.""" parent_path: Optional[str] = None @@ -710,12 +697,12 @@ def save_snapshot(self, value: Union["StoredStateData", "EventBase"]): self._storage.save_snapshot(value.handle.path, data) - def load_snapshot(self, handle: Handle) -> '_Serializable': + def load_snapshot(self, handle: Handle) -> Serializable: """Load a persistent snapshot.""" parent_path = None if handle.parent: parent_path = handle.parent.path - cls: Type[_Serializable] = self._type_registry.get((parent_path, handle.kind)) + cls: Type[Serializable] = self._type_registry.get((parent_path, handle.kind)) if not cls: raise NoTypeError(handle.path) data = self._storage.load_snapshot(handle.path) @@ -730,7 +717,7 @@ def drop_snapshot(self, handle: Handle): """Discard a persistent snapshot.""" self._storage.drop_snapshot(handle.path) - def observe(self, bound_event: BoundEvent[Any], observer: "_ObserverCallback"): + def observe(self, bound_event: BoundEvent, observer: Callable[[Any], None]): """Register observer to be called when bound_event is emitted. The bound_event is generally provided as an attribute of the object that emits @@ -996,24 +983,24 @@ class StoredStateData(Object): def __init__(self, parent: Object, attr_name: str): super().__init__(parent, attr_name) - self._cache: Dict[str, '_StorableType'] = {} + self._cache: Dict[str, Any] = {} self.dirty: bool = False - def __getitem__(self, key: str) -> '_StorableType': + def __getitem__(self, key: str) -> Any: return self._cache.get(key) - def __setitem__(self, key: str, value: '_StorableType'): + def __setitem__(self, key: str, value: Any): self._cache[key] = value self.dirty = True def __contains__(self, key: str): return key in self._cache - def snapshot(self) -> Dict[str, '_StorableType']: + def snapshot(self) -> Dict[str, Any]: """Return the current state.""" return self._cache - def restore(self, snapshot: Dict[str, '_StorableType']): + def restore(self, snapshot: Dict[str, Any]): """Restore current state to the given snapshot.""" self._cache = snapshot self.dirty = False @@ -1057,7 +1044,7 @@ def __init__(self, parent: Object, attr_name: str): def __getattr__(self, key: Literal['on']) -> ObjectEvents: pass - def __getattr__(self, key: str) -> Union['_StorableType', 'StoredObject', ObjectEvents]: + def __getattr__(self, key: str) -> Any: # "on" is the only reserved key that can't be used in the data map. if key == "on": return self._data.on # type: ignore # casting won't work for some reason @@ -1065,7 +1052,7 @@ def __getattr__(self, key: str) -> Union['_StorableType', 'StoredObject', Object raise AttributeError(f"attribute '{key}' is not stored") return _wrap_stored(self._data, self._data[key]) - def __setattr__(self, key: str, value: Union['_StorableType', '_StoredObject']): + def __setattr__(self, key: str, value: Any): if key == "on": raise AttributeError("attribute 'on' is reserved and cannot be set") @@ -1078,7 +1065,7 @@ def __setattr__(self, key: str, value: Union['_StorableType', '_StoredObject']): self._data[key] = unwrapped - def set_default(self, **kwargs: '_StorableType'): + def set_default(self, **kwargs: Any): """Set the value of any given key if it has not already been set.""" for k, v in kwargs.items(): if k not in self._data: @@ -1180,23 +1167,20 @@ def __get__(self, f'cannot find {self.__class__.__name__} attribute in type {parent_type.__name__}') -def _wrap_stored(parent_data: StoredStateData, value: '_StorableType' - ) -> Union['StoredDict', 'StoredList', 'StoredSet', '_StorableType']: +def _wrap_stored(parent_data: StoredStateData, value: Any) -> Any: if isinstance(value, dict): - return StoredDict(parent_data, value) + return StoredDict(parent_data, value) # type: ignore if isinstance(value, list): - return StoredList(parent_data, value) + return StoredList(parent_data, value) # type: ignore if isinstance(value, set): - return StoredSet(parent_data, value) + return StoredSet(parent_data, value) # type: ignore return value -def _unwrap_stored(parent_data: StoredStateData, - value: Union['_StoredObject', '_StorableType'] - ) -> '_StorableType': +def _unwrap_stored(parent_data: StoredStateData, value: Any) -> Any: if isinstance(value, (StoredDict, StoredList, StoredSet)): return value._under # pyright: reportPrivateUsage=false - return typing.cast('_StorableType', value) + return value def _wrapped_repr(obj: '_StoredObject') -> str: @@ -1207,10 +1191,10 @@ def _wrapped_repr(obj: '_StoredObject') -> str: return f"{t.__module__}.{t.__name__}()" -class StoredDict(typing.MutableMapping[Hashable, '_StorableType']): +class StoredDict(typing.MutableMapping[Hashable, Any]): """A dict-like object that uses the StoredState as backend.""" - def __init__(self, stored_data: StoredStateData, under: Dict[Any, Any]): + def __init__(self, stored_data: StoredStateData, under: Dict[Hashable, Any]): self._stored_data = stored_data self._under = under @@ -1242,7 +1226,7 @@ def __eq__(self, other: Any): __repr__ = _wrapped_repr # type: ignore -class StoredList(typing.MutableSequence['_StorableType']): +class StoredList(typing.MutableSequence[Any]): """A list-like object that uses the StoredState as backend.""" def __init__(self, stored_data: StoredStateData, under: List[Any]): @@ -1316,7 +1300,7 @@ def __ge__(self, other: Any): __repr__ = _wrapped_repr # type: ignore -class StoredSet(typing.MutableSet['_StorableType']): +class StoredSet(typing.MutableSet[Any]): """A set-like object that uses the StoredState as backend.""" def __init__(self, stored_data: StoredStateData, under: Set[Any]): diff --git a/ops/main.py b/ops/main.py index 88681fb04..e86ec3034 100755 --- a/ops/main.py +++ b/ops/main.py @@ -68,7 +68,7 @@ def _get_charm_dir(): return charm_dir -def _create_event_link(charm: 'CharmBase', bound_event: 'EventSource[Any]', +def _create_event_link(charm: 'CharmBase', bound_event: 'EventSource', link_to: Union[str, Path]): """Create a symlink for a particular event. @@ -150,7 +150,7 @@ def _emit_charm_event(charm: 'CharmBase', event_name: str): def _get_event_args(charm: 'CharmBase', - bound_event: 'BoundEvent[Any]') -> Tuple[List[Any], Dict[str, Any]]: + bound_event: 'BoundEvent') -> Tuple[List[Any], Dict[str, Any]]: event_type = bound_event.event_type model = charm.framework.model diff --git a/ops/model.py b/ops/model.py index 08ee06a50..3af40f187 100644 --- a/ops/model.py +++ b/ops/model.py @@ -48,7 +48,6 @@ TextIO, Tuple, Type, - TypeVar, Union, ) @@ -57,56 +56,28 @@ from ops._private import timeconv, yaml from ops.jujuversion import JujuVersion +# a k8s spec is a mapping from names/"types" to json/yaml spec objects +# public since it is used in ops.testing +K8sSpec = Mapping[str, Any] + if typing.TYPE_CHECKING: - from pebble import ( # pyright: reportMissingTypeStubs=false - CheckInfo, - CheckLevel, - Client, - ExecProcess, - FileInfo, - LayerDict, - Plan, - ServiceInfo, - ) from typing_extensions import TypedDict - from ops.framework import _SerializedData from ops.testing import _ConfigOption _StorageDictType = Dict[str, Optional[List['Storage']]] _BindingDictType = Dict[Union[str, 'Relation'], 'Binding'] - Numerical = Union[int, float] - - # all types that can be (de) serialized to json(/yaml) fom Python builtins - JsonObject = Union[None, Numerical, bool, str, - Dict[str, 'JsonObject'], - List['JsonObject'], - Tuple['JsonObject', ...]] - - # a k8s spec is a mapping from names/"types" to json/yaml spec objects - # public since it is used in ops.testing - K8sSpec = Mapping[str, JsonObject] _StatusDict = TypedDict('_StatusDict', {'status': str, 'message': str}) - # the data structure we can use to initialize pebble layers with. - _Layer = Union[str, LayerDict, pebble.Layer] - # mapping from relation name to a list of relation objects _RelationMapping_Raw = Dict[str, Optional[List['Relation']]] - # mapping from relation name to relation metadata - _RelationsMeta_Raw = Dict[str, ops.charm.RelationMeta] # mapping from container name to container metadata _ContainerMeta_Raw = Dict[str, ops.charm.ContainerMeta] - _NetworkAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str] - _Network = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] - - _ServiceInfoMapping = Mapping[str, pebble.ServiceInfo] # relation data is a string key: string value mapping so far as the # controller is concerned _RelationDataContent_Raw = Dict[str, str] - UnitOrApplication = Union['Unit', 'Application'] UnitOrApplicationType = Union[Type['Unit'], Type['Application']] _AddressDict = TypedDict('_AddressDict', { @@ -143,7 +114,7 @@ def __init__(self, meta: 'ops.charm.CharmMeta', backend: '_ModelBackend'): self._cache = _ModelCache(meta, backend) self._backend = backend self._unit = self.get_unit(self._backend.unit_name) - relations: _RelationsMeta_Raw = meta.relations + relations: Dict[str, 'ops.RelationMeta'] = meta.relations self._relations = RelationMapping(relations, self.unit, self._backend, self._cache) self._config = ConfigData(self._backend) resources: Iterable[str] = meta.resources @@ -291,16 +262,13 @@ def get_secret(self, *, id: Optional[str] = None, label: Optional[str] = None) - return Secret(self._backend, id=info.id, label=info.label) -_T = TypeVar('_T', bound='UnitOrApplication') - - class _ModelCache: def __init__(self, meta: 'ops.charm.CharmMeta', backend: '_ModelBackend'): if typing.TYPE_CHECKING: # (entity type, name): instance. _weakcachetype = weakref.WeakValueDictionary[ Tuple['UnitOrApplicationType', str], - Optional['UnitOrApplication']] + Optional[Union['Unit', 'Application']]] self._meta = meta self._backend = backend @@ -701,7 +669,7 @@ def __repr__(self): class RelationMapping(Mapping[str, List['Relation']]): """Map of relation names to lists of :class:`Relation` instances.""" - def __init__(self, relations_meta: '_RelationsMeta_Raw', our_unit: 'Unit', + def __init__(self, relations_meta: Dict[str, 'ops.RelationMeta'], our_unit: 'Unit', backend: '_ModelBackend', cache: '_ModelCache'): self._peers: Set[str] = set() for name, relation_meta in relations_meta.items(): @@ -842,7 +810,7 @@ def network(self) -> 'Network': return self._network -def _cast_network_address(raw: str) -> '_NetworkAddress': +def _cast_network_address(raw: str) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]: # fields marked as network addresses need not be IPs; they could be # hostnames that juju failed to resolve. In that case, we'll log a # debug message and leave it as-is. @@ -857,27 +825,37 @@ class Network: """Network space details. Charm authors should not instantiate this directly, but should get access to the Network - definition from :meth:`Model.get_binding` and its ``network`` attribute. + definition from :meth:`Model.get_binding` and its :code:`network` attribute. + """ - Attributes: - interfaces: A list of :class:`NetworkInterface` details. This includes the - information about how your application should be configured (eg, what - IP addresses should you bind to.) - Note that multiple addresses for a single interface are represented as multiple - interfaces. (eg, ``[NetworkInfo('ens1', '10.1.1.1/32'), - NetworkInfo('ens1', '10.1.2.1/32'])``) - ingress_addresses: A list of :class:`ipaddress.ip_address` objects representing the IP - addresses that other units should use to get in touch with you. - egress_subnets: A list of :class:`ipaddress.ip_network` representing the subnets that - other units will see you connecting from. Due to things like NAT it isn't always - possible to narrow it down to a single address, but when it is clear, the CIDRs - will be constrained to a single address. (eg, 10.0.0.1/32) - Args: - network_info: A dict of network information as returned by ``network-get``. + interfaces: List['NetworkInterface'] + """A list of network interface details. This includes the information + about how your application should be configured (for example, what IP + addresses you should bind to). + + Multiple addresses for a single interface are represented as multiple + interfaces, for example:: + + [NetworkInfo('ens1', '10.1.1.1/32'), NetworkInfo('ens1', '10.1.2.1/32']) """ + """A list of IP addresses that other units should use to get in touch with you.""" + ingress_addresses: List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]] + + """A list of networks representing the subnets that other units will see + you connecting from. Due to things like NAT it isn't always possible to + narrow it down to a single address, but when it is clear, the CIDRs will + be constrained to a single address (for example, 10.0.0.1/32). + """ + egress_subnets: List[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]] + def __init__(self, network_info: '_NetworkDict'): - self.interfaces: List[NetworkInterface] = [] + """Initialize a Network instance. + + Args: + network_info: A dict of network information as returned by ``network-get``. + """ + self.interfaces = [] # Treat multiple addresses on an interface as multiple logical # interfaces with the same name. for interface_info in network_info.get('bind-addresses', []): @@ -886,15 +864,17 @@ def __init__(self, network_info: '_NetworkDict'): if addrs is not None: for address_info in addrs: self.interfaces.append(NetworkInterface(interface_name, address_info)) - self.ingress_addresses: List[_NetworkAddress] = [] + + self.ingress_addresses = [] for address in network_info.get('ingress-addresses', []): self.ingress_addresses.append(_cast_network_address(address)) - self.egress_subnets: List[_Network] = [] + + self.egress_subnets = [] for subnet in network_info.get('egress-subnets', []): self.egress_subnets.append(ipaddress.ip_network(subnet)) @property - def bind_address(self) -> Optional['_NetworkAddress']: + def bind_address(self) -> Optional[Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]]: """A single address that your application should bind() to. For the common case where there is a single answer. This represents a single @@ -907,7 +887,8 @@ def bind_address(self) -> Optional['_NetworkAddress']: return None @property - def ingress_address(self) -> Optional['_NetworkAddress']: + def ingress_address( + self) -> Optional[Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]]: """The address other applications should use to connect to your unit. Due to things like public/private addresses, NAT and tunneling, the address you bind() @@ -925,11 +906,17 @@ class NetworkInterface: Charmers should not instantiate this type directly. Instead use :meth:`Model.get_binding` to get the network information for a given endpoint. + """ - Attributes: - name: The name of the interface (eg. 'eth0', or 'ens1') - subnet: An :class:`ipaddress.ip_network` representation of the IP for the network - interface. This may be a single address (eg '10.0.1.2/32') + name: str + """The name of the interface (for example, 'eth0' or 'ens1').""" + + address: Optional[Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]] + """The address of the network interface.""" + + subnet: Optional[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]] + """The subnet of the network interface. This may be a single address + (for example, '10.0.1.2/32'). """ def __init__(self, name: str, address_info: '_AddressDict'): @@ -944,7 +931,7 @@ def __init__(self, name: str, address_info: '_AddressDict'): # The value field may be empty. address_ = _cast_network_address(address) if address else None - self.address: Optional[_NetworkAddress] = address_ + self.address = address_ cidr: str = address_info.get('cidr') # The cidr field may be empty, see LP: #1864102. if cidr: @@ -954,7 +941,7 @@ def __init__(self, name: str, address_info: '_AddressDict'): subnet = ipaddress.ip_network(address) else: subnet = None - self.subnet: Optional[_Network] = subnet + self.subnet = subnet # TODO: expose a hostname/canonical name for the address here, see LP: #1864086. @@ -988,7 +975,7 @@ def __init__(self, self.rotates = rotates @classmethod - def from_dict(cls, id: str, d: '_SerializedData') -> 'SecretInfo': + def from_dict(cls, id: str, d: Dict[str, Any]) -> 'SecretInfo': """Create new SecretInfo object from ID and dict parsed from JSON.""" expires = typing.cast(Optional[str], d.get('expires')) try: @@ -1227,7 +1214,7 @@ def remove_revision(self, revision: int): Args: revision: The secret revision to remove. If being called from a secret event, this should usually be set to - :attr:`SecretEvent.revision`. + :attr:`SecretRemoveEvent.revision`. """ if self._id is None: self._id = self.get_info().id @@ -1248,8 +1235,8 @@ class Relation: """Represents an established relation between this application and another application. This class should not be instantiated directly, instead use :meth:`Model.get_relation` - or :attr:`ops.charm.RelationEvent.relation`. This is principally used by - :class:`ops.charm.RelationMeta` to represent the relationships between charms. + or :attr:`ops.RelationEvent.relation`. This is principally used by + :class:`ops.RelationMeta` to represent the relationships between charms. Attributes: name: The name of the local endpoint of the relation (eg 'db') @@ -1298,7 +1285,7 @@ def __repr__(self): return f'<{type(self).__module__}.{type(self).__name__} {self.name}:{self.id}>' -class RelationData(Mapping['UnitOrApplication', 'RelationDataContent']): +class RelationData(Mapping[Union['Unit', 'Application'], 'RelationDataContent']): """Represents the various data buckets of a given relation. Each unit and application involved in a relation has their own data bucket. @@ -1315,7 +1302,7 @@ class RelationData(Mapping['UnitOrApplication', 'RelationDataContent']): def __init__(self, relation: Relation, our_unit: Unit, backend: '_ModelBackend'): self.relation = weakref.proxy(relation) - self._data: Dict[UnitOrApplication, RelationDataContent] = { + self._data: Dict[Union['Unit', 'Application'], RelationDataContent] = { our_unit: RelationDataContent(self.relation, our_unit, backend), our_unit.app: RelationDataContent(self.relation, our_unit.app, backend), } @@ -1328,7 +1315,7 @@ def __init__(self, relation: Relation, our_unit: Unit, backend: '_ModelBackend') self.relation.app: RelationDataContent(self.relation, self.relation.app, backend), }) - def __contains__(self, key: 'UnitOrApplication'): + def __contains__(self, key: Union['Unit', 'Application']): return key in self._data def __len__(self): @@ -1337,7 +1324,7 @@ def __len__(self): def __iter__(self): return iter(self._data) - def __getitem__(self, key: 'UnitOrApplication'): + def __getitem__(self, key: Union['Unit', 'Application']): if key is None and self.relation.app is None: # NOTE: if juju gets fixed to set JUJU_REMOTE_APP for relation-broken events, then that # should fix the only case in which we expect key to be None - potentially removing the @@ -1358,7 +1345,7 @@ def __repr__(self): class RelationDataContent(LazyMapping, MutableMapping[str, str]): """Data content of a unit or application in a relation.""" - def __init__(self, relation: 'Relation', entity: 'UnitOrApplication', + def __init__(self, relation: 'Relation', entity: Union['Unit', 'Application'], backend: '_ModelBackend'): self.relation = relation self._entity = entity @@ -1755,7 +1742,7 @@ def index(self) -> int: @property def id(self) -> int: - """DEPRECATED (use ".index"): The index associated with the storage.""" + """Deprecated -- use :attr:`Storage.index` instead.""" logger.warning("model.Storage.id is being replaced - please use model.Storage.index") return self.index @@ -1816,18 +1803,13 @@ class Container: """ def __init__(self, name: str, backend: '_ModelBackend', - pebble_client: Optional['Client'] = None): + pebble_client: Optional['pebble.Client'] = None): self.name = name if pebble_client is None: socket_path = f'/charm/containers/{name}/pebble.socket' pebble_client = backend.get_pebble(socket_path) - self._pebble: 'Client' = pebble_client - - @property - def pebble(self) -> 'Client': - """The low-level :class:`ops.pebble.Client` instance for this container.""" - return self._pebble + self._pebble: 'pebble.Client' = pebble_client def can_connect(self) -> bool: """Report whether the Pebble API is reachable in the container. @@ -1909,7 +1891,8 @@ def stop(self, *service_names: str): self._pebble.stop_services(service_names) - def add_layer(self, label: str, layer: '_Layer', *, combine: bool = False): + def add_layer(self, label: str, layer: Union[str, pebble.LayerDict, pebble.Layer], *, + combine: bool = False): """Dynamically add a new layer onto the Pebble configuration layers. Args: @@ -1925,7 +1908,7 @@ def add_layer(self, label: str, layer: '_Layer', *, combine: bool = False): """ self._pebble.add_layer(label, layer, combine=combine) - def get_plan(self) -> 'Plan': + def get_plan(self) -> 'pebble.Plan': """Get the combined Pebble configuration. This will immediately reflect changes from any previous @@ -1934,7 +1917,7 @@ def get_plan(self) -> 'Plan': """ return self._pebble.get_plan() - def get_services(self, *service_names: str) -> '_ServiceInfoMapping': + def get_services(self, *service_names: str) -> Mapping[str, 'pebble.ServiceInfo']: """Fetch and return a mapping of status information indexed by service name. If no service names are specified, return status information for all @@ -1944,7 +1927,7 @@ def get_services(self, *service_names: str) -> '_ServiceInfoMapping': services = self._pebble.get_services(names) return ServiceInfoMapping(services) - def get_service(self, service_name: str) -> 'ServiceInfo': + def get_service(self, service_name: str) -> 'pebble.ServiceInfo': """Get status information for a single named service. Raises :class:`ModelError` if service_name is not found. @@ -1959,7 +1942,7 @@ def get_service(self, service_name: str) -> 'ServiceInfo': def get_checks( self, *check_names: str, - level: Optional['CheckLevel'] = None) -> 'CheckInfoMapping': + level: Optional['pebble.CheckLevel'] = None) -> 'CheckInfoMapping': """Fetch and return a mapping of check information indexed by check name. Args: @@ -1971,7 +1954,7 @@ def get_checks( checks = self._pebble.get_checks(names=check_names or None, level=level) return CheckInfoMapping(checks) - def get_check(self, check_name: str) -> 'CheckInfo': + def get_check(self, check_name: str) -> 'pebble.CheckInfo': """Get check information for a single named check. Raises :class:`ModelError` if check_name is not found. @@ -2040,7 +2023,7 @@ def push(self, group_id=group_id, group=group) def list_files(self, path: StrOrPath, *, pattern: Optional[str] = None, - itself: bool = False) -> List['FileInfo']: + itself: bool = False) -> List['pebble.FileInfo']: """Return list of directory entries from given path on remote system. Despite the name, this method returns a list of files *and* @@ -2206,8 +2189,8 @@ def pull_path(self, raise MultiPushPullError('failed to pull one or more files', errors) @staticmethod - def _build_fileinfo(path: StrOrPath) -> 'FileInfo': - """Constructs a FileInfo object by stat'ing a local path.""" + def _build_fileinfo(path: StrOrPath) -> 'pebble.FileInfo': + """Construct a :class:`pebble.FileInfo` object by stat'ing a local path.""" path = Path(path) if path.is_symlink(): ftype = pebble.FileType.SYMLINK @@ -2235,8 +2218,8 @@ def _build_fileinfo(path: StrOrPath) -> 'FileInfo': @staticmethod def _list_recursive(list_func: Callable[[Path], - Iterable['FileInfo']], - path: Path) -> Generator['FileInfo', None, None]: + Iterable['pebble.FileInfo']], + path: Path) -> Generator['pebble.FileInfo', None, None]: """Recursively lists all files under path using the given list_func. Args: @@ -2348,7 +2331,7 @@ def exec( stderr: Optional[Union[TextIO, BinaryIO]] = None, encoding: str = 'utf-8', combine_stderr: bool = False - ) -> 'ExecProcess': + ) -> 'pebble.ExecProcess': """Execute the given command on the remote system. See :meth:`ops.pebble.Client.exec` for documentation of the parameters @@ -2387,6 +2370,12 @@ def send_signal(self, sig: Union[int, str], *service_names: str): self._pebble.send_signal(sig, service_names) + # Define this last to avoid clashes with the imported "pebble" module + @property + def pebble(self) -> 'pebble.Client': + """The low-level :class:`ops.pebble.Client` instance for this container.""" + return self._pebble + class ContainerMapping(Mapping[str, Container]): """Map of container names to Container objects. @@ -2411,14 +2400,14 @@ def __repr__(self): return repr(self._containers) -class ServiceInfoMapping(Mapping[str, 'ServiceInfo']): +class ServiceInfoMapping(Mapping[str, 'pebble.ServiceInfo']): """Map of service names to :class:`ops.pebble.ServiceInfo` objects. This is done as a mapping object rather than a plain dictionary so that we can extend it later, and so it's not mutable. """ - def __init__(self, services: Iterable['ServiceInfo']): + def __init__(self, services: Iterable['pebble.ServiceInfo']): self._services = {s.name: s for s in services} def __getitem__(self, key: str): @@ -2434,14 +2423,14 @@ def __repr__(self): return repr(self._services) -class CheckInfoMapping(Mapping[str, 'CheckInfo']): - """Map of check names to :class:`ops.pebble.CheckInfo` objects. +class CheckInfoMapping(Mapping[str, 'pebble.CheckInfo']): + """Map of check names to :class:`pebble.CheckInfo` objects. This is done as a mapping object rather than a plain dictionary so that we can extend it later, and so it's not mutable. """ - def __init__(self, checks: Iterable['CheckInfo']): + def __init__(self, checks: Iterable['pebble.CheckInfo']): self._checks = {c.name: c for c in checks} def __getitem__(self, key: str): @@ -2511,7 +2500,7 @@ class SecretNotFoundError(ModelError): _ACTION_RESULT_KEY_REGEX = re.compile(r'^[a-z0-9](([a-z0-9-.]+)?[a-z0-9])?$') -def _format_action_result_dict(input: Dict[str, 'JsonObject'], +def _format_action_result_dict(input: Dict[str, Any], parent_key: Optional[str] = None, output: Optional[Dict[str, str]] = None ) -> Dict[str, str]: @@ -2558,7 +2547,7 @@ def _format_action_result_dict(input: Dict[str, 'JsonObject'], key = f"{parent_key}.{key}" if isinstance(value, MutableMapping): - value = typing.cast(Dict[str, 'JsonObject'], value) + value = typing.cast(Dict[str, Any], value) output_ = _format_action_result_dict(value, key, output_) elif key in output_: raise ValueError("duplicate key detected in dictionary passed to 'action-set': {!r}" @@ -2603,7 +2592,7 @@ def __init__(self, unit_name: Optional[str] = None, def _run(self, *args: str, return_output: bool = False, use_json: bool = False, input_stream: Optional[str] = None - ) -> Union[str, 'JsonObject', None]: + ) -> Union[str, Any, None]: kwargs = dict(stdout=PIPE, stderr=PIPE, check=True, encoding='utf-8') if input_stream: kwargs.update({"input": input_stream}) @@ -2754,8 +2743,8 @@ def resource_get(self, resource_name: str) -> str: out = self._run('resource-get', resource_name, return_output=True) return typing.cast(str, out).strip() - def pod_spec_set(self, spec: Mapping[str, 'JsonObject'], - k8s_resources: Optional[Mapping[str, 'JsonObject']] = None): + def pod_spec_set(self, spec: Mapping[str, Any], + k8s_resources: Optional[Mapping[str, Any]] = None): tmpdir = Path(tempfile.mkdtemp('-pod-spec-set')) try: spec_path = tmpdir / 'spec.yaml' @@ -2853,7 +2842,7 @@ def action_get(self) -> Dict[str, str]: # todo: what do we know about this dict out = self._run('action-get', return_output=True, use_json=True) return typing.cast(Dict[str, str], out) - def action_set(self, results: '_SerializedData') -> None: + def action_set(self, results: Dict[str, Any]) -> None: # The Juju action-set hook tool cannot interpret nested dicts, so we use a helper to # flatten out any nested dict structures into a dotted notation, and validate keys. flat_results = _format_action_result_dict(results) @@ -2906,7 +2895,7 @@ def network_get(self, binding_name: str, relation_id: Optional[int] = None) -> ' raise RelationNotFoundError() from e raise - def add_metrics(self, metrics: Mapping[str, 'Numerical'], + def add_metrics(self, metrics: Mapping[str, Union[int, float]], labels: Optional[Mapping[str, str]] = None) -> None: cmd: List[str] = ['add-metric'] if labels: @@ -2925,8 +2914,8 @@ def add_metrics(self, metrics: Mapping[str, 'Numerical'], cmd.extend(metric_args) self._run(*cmd) - def get_pebble(self, socket_path: str) -> 'Client': - """Create a pebble.Client instance from given socket path.""" + def get_pebble(self, socket_path: str) -> 'pebble.Client': + """Create a :class:`pebble.Client` instance from given socket path.""" return pebble.Client(socket_path=socket_path) def planned_units(self) -> int: @@ -2948,7 +2937,7 @@ def planned_units(self) -> int: num_alive = sum(1 for unit in units.values() if unit['status'] != 'dying') return num_alive - def update_relation_data(self, relation_id: int, _entity: 'UnitOrApplication', + def update_relation_data(self, relation_id: int, _entity: Union['Unit', 'Application'], key: str, value: str): self.relation_set(relation_id, key, value, isinstance(_entity, Application)) @@ -2978,7 +2967,7 @@ def secret_get(self, *, return typing.cast(Dict[str, str], result) def _run_for_secret(self, *args: str, return_output: bool = False, - use_json: bool = False) -> Union[str, 'JsonObject', None]: + use_json: bool = False) -> Union[str, Any, None]: try: return self._run(*args, return_output=return_output, use_json=use_json) except ModelError as e: @@ -2995,9 +2984,9 @@ def secret_info_get(self, *, elif label is not None: # elif because Juju secret-info-get doesn't allow id and label args.extend(['--label', label]) result = self._run_for_secret('secret-info-get', *args, return_output=True, use_json=True) - info_dicts = typing.cast(Dict[str, 'JsonObject'], result) + info_dicts = typing.cast(Dict[str, Any], result) id = list(info_dicts)[0] # Juju returns dict of {secret_id: {info}} - return SecretInfo.from_dict(id, typing.cast('_SerializedData', info_dicts[id])) + return SecretInfo.from_dict(id, typing.cast(Dict[str, Any], info_dicts[id])) def secret_set(self, id: str, *, content: Optional[Dict[str, str]] = None, @@ -3120,7 +3109,7 @@ def validate_metric_label(cls, label_name: str): label_name, cls.METRIC_KEY_REGEX.pattern)) @classmethod - def format_metric_value(cls, value: 'Numerical'): + def format_metric_value(cls, value: Union[int, float]): if not isinstance(value, (int, float)): # pyright: reportUnnecessaryIsInstance=false raise ModelError('invalid metric value {!r} provided:' ' must be a positive finite float'.format(value)) diff --git a/ops/pebble.py b/ops/pebble.py index b5179657a..7d46a3b03 100644 --- a/ops/pebble.py +++ b/ops/pebble.py @@ -61,6 +61,52 @@ from ops._private import timeconv, yaml +# Public as these are used in the Container.add_layer signature +ServiceDict = typing.TypedDict('ServiceDict', + {'summary': str, + 'description': str, + 'startup': str, + 'override': str, + 'command': str, + 'after': Sequence[str], + 'before': Sequence[str], + 'requires': Sequence[str], + 'environment': Dict[str, str], + 'user': str, + 'user-id': Optional[int], + 'group': str, + 'group-id': Optional[int], + 'on-success': str, + 'on-failure': str, + 'on-check-failure': Dict[str, Any], + 'backoff-delay': str, + 'backoff-factor': Optional[int], + 'backoff-limit': str, + }, + total=False) + +HttpDict = typing.TypedDict('HttpDict', {'url': str}) +TcpDict = typing.TypedDict('TcpDict', {'port': int}) +ExecDict = typing.TypedDict('ExecDict', {'command': str}) + +CheckDict = typing.TypedDict('CheckDict', + {'override': str, + 'level': Union['CheckLevel', str], + 'period': Optional[str], + 'timeout': Optional[str], + 'http': Optional[HttpDict], + 'tcp': Optional[TcpDict], + 'exec': Optional[ExecDict], + 'threshold': Optional[int]}, + total=False) + +LayerDict = typing.TypedDict('LayerDict', + {'summary': str, + 'description': str, + 'services': Dict[str, ServiceDict], + 'checks': Dict[str, CheckDict]}, + total=False) + if TYPE_CHECKING: from email.message import Message @@ -71,7 +117,6 @@ class _BodyHandler(Protocol): def __call__(self, data: bytes, done: bool = False) -> None: ... # noqa _HeaderHandler = Callable[[bytes], None] - _StrOrBytes = Union[str, bytes] # tempfile.NamedTemporaryFile has an odd interface because of that # 'name' attribute, so we need to make a Protocol for it. @@ -86,7 +131,7 @@ def write(self, __s: typing.AnyStr) -> int: ... # noqa def __enter__(self) -> typing.IO[typing.AnyStr]: ... # noqa class _Readable(Protocol): - def read(self, n: int = -1) -> _StrOrBytes: ... # noqa + def read(self, n: int = -1) -> Union[str, bytes]: ... # noqa class _Writeable(Protocol): # We'd need something like io.ReadableBuffer here, @@ -115,19 +160,6 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa "group-id": Optional[int], "group": Optional[str], "type": Union['FileType', str]}) - _HttpDict = TypedDict('_HttpDict', {'url': str}) - _TcpDict = TypedDict('_TcpDict', {'port': int}) - _ExecDict = TypedDict('_ExecDict', {'command': str}) - _CheckDict = TypedDict('_CheckDict', - {'override': str, - 'level': Union['CheckLevel', str], - 'period': Optional[str], - 'timeout': Optional[str], - 'http': Optional[_HttpDict], - 'tcp': Optional[_TcpDict], - 'exec': Optional[_ExecDict], - 'threshold': Optional[int]}, - total=False) _AuthDict = TypedDict('_AuthDict', {'permissions': Optional[str], @@ -143,28 +175,6 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa {'startup': Union['ServiceStartup', str], 'current': Union['ServiceStatus', str], 'name': str}) - _ServiceDict = TypedDict('_ServiceDict', - {'summary': str, - 'description': str, - 'startup': str, - 'override': str, - 'command': str, - 'after': Sequence[str], - 'before': Sequence[str], - 'requires': Sequence[str], - 'environment': Dict[str, str], - 'user': str, - 'user-id': Optional[int], - 'group': str, - 'group-id': Optional[int], - 'on-success': str, - 'on-failure': str, - 'on-check-failure': Dict[str, Any], - 'backoff-delay': str, - 'backoff-factor': Optional[int], - 'backoff-limit': str, - }, - total=False) _ProgressDict = TypedDict('_ProgressDict', {'label': str, @@ -195,17 +205,9 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa 'data': Optional[_ChangeData]}) _PlanDict = TypedDict('_PlanDict', - {'services': Dict[str, _ServiceDict], - 'checks': Dict[str, _CheckDict]}, + {'services': Dict[str, ServiceDict], + 'checks': Dict[str, CheckDict]}, total=False) - # public as it is accessed by ops.testing - LayerDict = TypedDict('LayerDict', - {'summary': str, - 'description': str, - 'services': Dict[str, _ServiceDict], - 'checks': Dict[str, _CheckDict]}, - total=False) - _Error = TypedDict('_Error', {'kind': str, 'message': str}) @@ -387,8 +389,8 @@ def __init__( self, command: List[str], exit_code: int, - stdout: Optional['_StrOrBytes'], - stderr: Optional['_StrOrBytes'], + stdout: Optional[Union[str, bytes]], + stderr: Optional[Union[str, bytes]], ): self.command = command self.exit_code = exit_code @@ -759,9 +761,9 @@ def __eq__(self, other: Union['LayerDict', 'Layer']) -> bool: class Service: """Represents a service description in a Pebble configuration layer.""" - def __init__(self, name: str, raw: Optional['_ServiceDict'] = None): + def __init__(self, name: str, raw: Optional['ServiceDict'] = None): self.name = name - dct: _ServiceDict = raw or {} + dct: ServiceDict = raw or {} self.summary = dct.get('summary', '') self.description = dct.get('description', '') self.startup = dct.get('startup', '') @@ -782,7 +784,7 @@ def __init__(self, name: str, raw: Optional['_ServiceDict'] = None): self.backoff_factor = dct.get('backoff-factor') self.backoff_limit = dct.get('backoff-limit', '') - def to_dict(self) -> '_ServiceDict': + def to_dict(self) -> 'ServiceDict': """Convert this service object to its dict representation.""" fields = [ ('summary', self.summary), @@ -806,7 +808,7 @@ def to_dict(self) -> '_ServiceDict': ('backoff-limit', self.backoff_limit), ] dct = {name: value for name, value in fields if value} - return typing.cast('_ServiceDict', dct) + return typing.cast('ServiceDict', dct) def _merge(self, other: 'Service'): """Merges this service object with another service definition. @@ -827,7 +829,7 @@ def _merge(self, other: 'Service'): def __repr__(self) -> str: return f'Service({self.to_dict()!r})' - def __eq__(self, other: Union['_ServiceDict', 'Service']) -> bool: + def __eq__(self, other: Union['ServiceDict', 'Service']) -> bool: """Reports whether this service configuration is equal to another.""" if isinstance(other, dict): return self.to_dict() == other @@ -897,9 +899,9 @@ def __repr__(self): class Check: """Represents a check in a Pebble configuration layer.""" - def __init__(self, name: str, raw: Optional['_CheckDict'] = None): + def __init__(self, name: str, raw: Optional['CheckDict'] = None): self.name = name - dct: _CheckDict = raw or {} + dct: CheckDict = raw or {} self.override: str = dct.get('override', '') try: level: Union[CheckLevel, str] = CheckLevel(dct.get('level', '')) @@ -913,19 +915,19 @@ def __init__(self, name: str, raw: Optional['_CheckDict'] = None): http = dct.get('http') if http is not None: http = copy.deepcopy(http) - self.http: Optional[_HttpDict] = http + self.http: Optional[HttpDict] = http tcp = dct.get('tcp') if tcp is not None: tcp = copy.deepcopy(tcp) - self.tcp: Optional[_TcpDict] = tcp + self.tcp: Optional[TcpDict] = tcp exec_ = dct.get('exec') if exec_ is not None: exec_ = copy.deepcopy(exec_) - self.exec: Optional[_ExecDict] = exec_ + self.exec: Optional[ExecDict] = exec_ - def to_dict(self) -> '_CheckDict': + def to_dict(self) -> 'CheckDict': """Convert this check object to its dict representation.""" level: str = self.level.value if isinstance(self.level, CheckLevel) else self.level fields = [ @@ -939,12 +941,12 @@ def to_dict(self) -> '_CheckDict': ('exec', self.exec), ] dct = {name: value for name, value in fields if value} - return typing.cast('_CheckDict', dct) + return typing.cast('CheckDict', dct) def __repr__(self) -> str: return f'Check({self.to_dict()!r})' - def __eq__(self, other: Union['_CheckDict', 'Check']) -> bool: + def __eq__(self, other: Union['CheckDict', 'Check']) -> bool: """Reports whether this check configuration is equal to another.""" if isinstance(other, dict): return self.to_dict() == other @@ -1219,7 +1221,7 @@ def _wait(self) -> int: exit_code = change.tasks[0].data.get('exit-code', -1) return exit_code - def wait_output(self) -> Tuple['_StrOrBytes', Optional['_StrOrBytes']]: + def wait_output(self) -> Tuple[Union[str, bytes], Optional[Union[str, bytes]]]: """Wait for the process to finish and return tuple of (stdout, stderr). If a timeout was specified to the :meth:`Client.exec` call, this waits @@ -1246,8 +1248,8 @@ def wait_output(self) -> Tuple['_StrOrBytes', Optional['_StrOrBytes']]: exit_code: int = self._wait() - out_value: '_StrOrBytes' = out.getvalue() - err_value: Optional['_StrOrBytes'] = err.getvalue() if err is not None else None + out_value: Union[str, bytes] = out.getvalue() + err_value: Optional[Union[str, bytes]] = err.getvalue() if err is not None else None if exit_code != 0: raise ExecError(self._command, exit_code, out_value, err_value) @@ -1308,7 +1310,7 @@ def _websocket_to_writer(ws: '_WebSocket', writer: '_WebsocketWriter', encoding: str): """Receive messages from websocket (until end signal) and write to writer.""" while True: - chunk: _StrOrBytes = ws.recv() + chunk: Union[str, bytes] = ws.recv() if isinstance(chunk, str): try: @@ -1340,7 +1342,7 @@ def writable(self): """Denote this file-like object as writable.""" return True - def write(self, chunk: '_StrOrBytes') -> int: + def write(self, chunk: Union[str, bytes]) -> int: """Write chunk to the websocket.""" if not isinstance(chunk, bytes): raise TypeError(f'value to write must be bytes, not {type(chunk).__name__}') @@ -1364,14 +1366,14 @@ def readable(self) -> bool: """Denote this file-like object as readable.""" return True - def read(self, n: int = -1) -> '_StrOrBytes': + def read(self, n: int = -1) -> Union[str, bytes]: """Read up to n bytes from the websocket (or one message if n<0).""" if self.eof: # Calling read() multiple times after EOF should still return EOF return b'' while not self.remaining: - chunk: _StrOrBytes = self.ws.recv() + chunk: Union[str, bytes] = self.ws.recv() if isinstance(chunk, str): try: @@ -1393,11 +1395,11 @@ def read(self, n: int = -1) -> '_StrOrBytes': if n < 0: n = len(self.remaining) - result: '_StrOrBytes' = self.remaining[:n] + result: Union[str, bytes] = self.remaining[:n] self.remaining = self.remaining[n:] return result - def read1(self, n: int = -1) -> '_StrOrBytes': + def read1(self, n: int = -1) -> Union[str, bytes]: """An alias for read.""" return self.read(n) @@ -1955,7 +1957,7 @@ def generator() -> Generator[bytes, None, None]: b'\r\n', ]) - content: '_StrOrBytes' = source_io.read(self._chunk_size) + content: Union[str, bytes] = source_io.read(self._chunk_size) while content: if isinstance(content, str): content = content.encode(encoding) diff --git a/ops/testing.py b/ops/testing.py index cdde47020..fbbebcc31 100755 --- a/ops/testing.py +++ b/ops/testing.py @@ -60,7 +60,7 @@ if TYPE_CHECKING: from typing_extensions import TypedDict - from ops.model import UnitOrApplication, _NetworkDict + from ops.model import _NetworkDict ReadableBuffer = Union[bytes, str, StringIO, BytesIO, BinaryIO] _StringOrPath = Union[str, pathlib.PurePosixPath, pathlib.Path] @@ -79,11 +79,10 @@ 'units': List[str] }) - _ConfigValue = Union[str, int, float, bool] _ConfigOption = TypedDict('_ConfigOption', { 'type': Literal['string', 'int', 'float', 'boolean'], 'description': str, - 'default': _ConfigValue + 'default': Union[str, int, float, bool], }) _StatusName = Literal['unknown', 'blocked', 'active', 'maintenance', 'waiting'] _RawStatus = TypedDict('_RawStatus', { @@ -215,7 +214,7 @@ def _event_context(self, event_name: str): def set_can_connect(self, container: Union[str, model.Container], val: bool): """Change the simulated connection status of a container's underlying Pebble client. - After calling this, :meth:`ops.model.Container.can_connect` will return val. + After calling this, :meth:`ops.Container.can_connect` will return val. """ if isinstance(container, str): container = self.model.unit.get_container(container) @@ -1041,7 +1040,7 @@ def _emit_relation_changed(self, relation_id: int, app_or_unit: str): def _update_config( self, - key_values: Optional[Mapping[str, '_ConfigValue']] = None, + key_values: Optional[Mapping[str, Union[str, int, float, bool]]] = None, unset: Iterable[str] = (), ) -> None: """Update the config as seen by the charm. @@ -1078,7 +1077,7 @@ def _update_config( def update_config( self, - key_values: Optional[Mapping[str, '_ConfigValue']] = None, + key_values: Optional[Mapping[str, Union[str, int, float, bool]]] = None, unset: Iterable[str] = (), ) -> None: """Update the config as seen by the charm. @@ -1247,8 +1246,8 @@ def add_model_secret(self, owner: AppUnitOrName, content: Dict[str, str]) -> str """Add a secret owned by the remote application or unit specified. This is named :code:`add_model_secret` instead of :code:`add_secret` - to avoid confusion with the :meth:`ops.model.Application.add_secret` - and :meth:`ops.model.Unit.add_secret` methods used by secret owner + to avoid confusion with the :meth:`ops.Application.add_secret` + and :meth:`ops.Unit.add_secret` methods used by secret owner charms. Args: @@ -1479,7 +1478,7 @@ def decorator(target_cls: Any): @_record_calls -class _TestingConfig(Dict[str, '_ConfigValue']): +class _TestingConfig(Dict[str, Union[str, int, float, bool]]): """Represents the Juju Config.""" _supported_types = { 'string': str, @@ -1499,7 +1498,7 @@ def __init__(self, config: 'RawConfig'): self._config_set(key, value) @staticmethod - def _load_defaults(charm_config: 'RawConfig') -> Dict[str, '_ConfigValue']: + def _load_defaults(charm_config: 'RawConfig') -> Dict[str, Union[str, int, float, bool]]: """Load default values from config.yaml. Handle the case where a user doesn't supply explicit config snippets. @@ -1509,7 +1508,7 @@ def _load_defaults(charm_config: 'RawConfig') -> Dict[str, '_ConfigValue']: cfg: Dict[str, '_ConfigOption'] = charm_config.get('options', {}) return {key: value.get('default', None) for key, value in cfg.items()} - def _config_set(self, key: str, value: '_ConfigValue'): + def _config_set(self, key: str, value: Union[str, int, float, bool]): # this is only called by the harness itself # we don't do real serialization/deserialization, but we do check that the value # has the expected type. @@ -1717,7 +1716,7 @@ def relation_get(self, relation_id: int, member_name: str, is_app: bool): raise model.RelationNotFoundError() return self._relation_data_raw[relation_id][member_name] - def update_relation_data(self, relation_id: int, _entity: 'UnitOrApplication', + def update_relation_data(self, relation_id: int, _entity: Union[model.Unit, model.Application], key: str, value: str): # this is where the 'real' backend would call relation-set. raw_data = self._relation_data_raw[relation_id][_entity.name] diff --git a/tox.ini b/tox.ini index e7d7159c3..0bc99fa2a 100644 --- a/tox.ini +++ b/tox.ini @@ -28,7 +28,7 @@ description = Build the Sphinx docs deps = -r{toxinidir}/docs/requirements.txt commands = - sphinx-build -M html docs/ docs/_build + sphinx-build -W --keep-going docs/ docs/_build/html [testenv:fmt] description = Apply coding style standards to code From 057e29a71e5194f5add4d8967950397e58b159cc Mon Sep 17 00:00:00 2001 From: Ben Hoyt Date: Thu, 8 Jun 2023 10:33:28 +1200 Subject: [PATCH 2/5] Make PlanDict public; Sort nitpick_ignore list; Couple other tweaks --- docs/conf.py | 29 +++++++++++++---------------- ops/pebble.py | 29 ++++++++++++++--------------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index edeccde70..a1822577b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -73,31 +73,28 @@ def _compute_navigation_tree(context): # domain name if present. Example entries would be ('py:func', 'int') or # ('envvar', 'LD_LIBRARY_PATH'). nitpick_ignore = [ - ('py:class', 'ops.model._ModelBackend'), - ('py:class', 'ops.model._ModelCache'), ('py:class', '_AddressDict'), - ('py:class', '_NetworkDict'), - ('py:class', '_RelationMetaDict'), - ('py:class', '_ResourceMetaDict'), - ('py:class', '_StorageMetaDict'), - ('py:class', '_ChangeData'), ('py:class', '_ChangeDict'), - ('py:class', '_InfoDict'), + ('py:class', '_CheckInfoDict'), + ('py:class', '_FileInfoDict'), ('py:class', '_IOSource'), - ('py:class', '_TextOrBinaryIO'), + ('py:class', '_NetworkDict'), + ('py:class', '_ProgressDict'), ('py:class', '_Readable'), - ('py:class', '_Writeable'), - ('py:class', '_WebSocket'), - ('py:class', '_FileInfoDict'), - ('py:class', '_PlanDict'), + ('py:class', '_RelationMetaDict'), + ('py:class', '_ResourceMetaDict'), ('py:class', '_ServiceInfoDict'), + ('py:class', '_StorageMetaDict'), ('py:class', '_SystemInfoDict'), - ('py:class', '_TaskData'), ('py:class', '_TaskDict'), - ('py:class', '_ProgressDict'), + ('py:class', '_TextOrBinaryIO'), ('py:class', '_WarningDict'), - ('py:class', 'ops.storage.SQLiteStorage'), + ('py:class', '_WebSocket'), + ('py:class', '_Writeable'), + ('py:class', 'ops.model._ModelBackend'), + ('py:class', 'ops.model._ModelCache'), ('py:class', 'ops.storage.JujuStorage'), + ('py:class', 'ops.storage.SQLiteStorage'), ('py:class', 'ops.testing.CharmType'), ('py:obj', 'ops.testing.CharmType'), ] diff --git a/ops/pebble.py b/ops/pebble.py index 7d46a3b03..eefa3329f 100644 --- a/ops/pebble.py +++ b/ops/pebble.py @@ -107,6 +107,11 @@ 'checks': Dict[str, CheckDict]}, total=False) +PlanDict = typing.TypedDict('PlanDict', + {'services': Dict[str, ServiceDict], + 'checks': Dict[str, CheckDict]}, + total=False) + if TYPE_CHECKING: from email.message import Message @@ -143,7 +148,7 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa _IOSource = Union[str, bytes, _AnyStrFileLikeIO] _SystemInfoDict = TypedDict('_SystemInfoDict', {'version': str}) - _InfoDict = TypedDict('_InfoDict', + _CheckInfoDict = TypedDict('_CheckInfoDict', {"name": str, "level": Optional[Union['CheckLevel', str]], "status": Union['CheckStatus', str], @@ -180,7 +185,6 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa {'label': str, 'done': int, 'total': int}) - _TaskData = Dict[str, Any] _TaskDict = TypedDict('_TaskDict', {'id': 'TaskID', 'kind': str, @@ -190,8 +194,7 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa 'progress': _ProgressDict, 'spawn-time': str, 'ready-time': str, - 'data': Optional[_TaskData]}) - _ChangeData = TypedDict('_ChangeData', {}) + 'data': Optional[Dict[str, Any]]}) _ChangeDict = TypedDict('_ChangeDict', {'id': str, 'kind': str, @@ -202,12 +205,8 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa 'tasks': Optional[List[_TaskDict]], 'err': Optional[str], 'ready-time': Optional[str], - 'data': Optional[_ChangeData]}) + 'data': Optional[Dict[str, Any]]}) - _PlanDict = TypedDict('_PlanDict', - {'services': Dict[str, ServiceDict], - 'checks': Dict[str, CheckDict]}, - total=False) _Error = TypedDict('_Error', {'kind': str, 'message': str}) @@ -541,7 +540,7 @@ def __init__( progress: TaskProgress, spawn_time: datetime.datetime, ready_time: Optional[datetime.datetime], - data: Optional['_TaskData'] = None, + data: Optional[Dict[str, Any]] = None, ): self.id = id self.kind = kind @@ -604,7 +603,7 @@ def __init__( err: Optional[str], spawn_time: datetime.datetime, ready_time: Optional[datetime.datetime], - data: Optional['_ChangeData'] = None, + data: Optional[Dict[str, Any]] = None, ): self.id = id self.kind = kind @@ -658,7 +657,7 @@ class Plan: def __init__(self, raw: str): d = yaml.safe_load(raw) or {} # type: ignore - d = typing.cast('_PlanDict', d) + d = typing.cast('PlanDict', d) self._raw = raw self._services: Dict[str, Service] = {name: Service(name, service) @@ -682,14 +681,14 @@ def checks(self) -> Dict[str, 'Check']: """ return self._checks - def to_dict(self) -> '_PlanDict': + def to_dict(self) -> 'PlanDict': """Convert this plan to its dict representation.""" fields = [ ('services', {name: service.to_dict() for name, service in self._services.items()}), ('checks', {name: check.to_dict() for name, check in self._checks.items()}), ] dct = {name: value for name, value in fields if value} - return typing.cast('_PlanDict', dct) + return typing.cast('PlanDict', dct) def to_yaml(self) -> str: """Return this plan's YAML representation.""" @@ -1079,7 +1078,7 @@ def __init__( self.threshold = threshold @classmethod - def from_dict(cls, d: '_InfoDict') -> 'CheckInfo': + def from_dict(cls, d: '_CheckInfoDict') -> 'CheckInfo': """Create new :class:`CheckInfo` object from dict parsed from JSON.""" try: level = CheckLevel(d.get('level', '')) From 6aac27d7f8f8bb40c43d8c500401907c2590f78f Mon Sep 17 00:00:00 2001 From: Ben Hoyt Date: Thu, 8 Jun 2023 10:38:09 +1200 Subject: [PATCH 3/5] Indentation fixes --- ops/pebble.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ops/pebble.py b/ops/pebble.py index eefa3329f..be5ceca41 100644 --- a/ops/pebble.py +++ b/ops/pebble.py @@ -108,9 +108,9 @@ total=False) PlanDict = typing.TypedDict('PlanDict', - {'services': Dict[str, ServiceDict], - 'checks': Dict[str, CheckDict]}, - total=False) + {'services': Dict[str, ServiceDict], + 'checks': Dict[str, CheckDict]}, + total=False) if TYPE_CHECKING: from email.message import Message @@ -149,11 +149,11 @@ def write(self, buf: Union[bytes, str, bytearray]) -> int: ... # noqa _SystemInfoDict = TypedDict('_SystemInfoDict', {'version': str}) _CheckInfoDict = TypedDict('_CheckInfoDict', - {"name": str, - "level": Optional[Union['CheckLevel', str]], - "status": Union['CheckStatus', str], - "failures": int, - "threshold": int}) + {"name": str, + "level": Optional[Union['CheckLevel', str]], + "status": Union['CheckStatus', str], + "failures": int, + "threshold": int}) _FileInfoDict = TypedDict('_FileInfoDict', {"path": str, "name": str, From bdbd85d9920b13a188d4bb49ca6106dcd1224f7b Mon Sep 17 00:00:00 2001 From: Ben Hoyt Date: Thu, 8 Jun 2023 10:58:36 +1200 Subject: [PATCH 4/5] Trivial tweaks --- ops/charm.py | 2 +- ops/model.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ops/charm.py b/ops/charm.py index fc2d4d636..acf625558 100755 --- a/ops/charm.py +++ b/ops/charm.py @@ -1142,7 +1142,7 @@ class StorageMeta: storage_name: Name of storage type: Storage type description: A text description of the storage - read_only: Whether the storage is read only + read_only: True if the storage is read-only minimum_size: Minimum size of storage location: Mount point of storage multiple_range: Range of numeric qualifiers when multiple storage units are used diff --git a/ops/model.py b/ops/model.py index 3af40f187..a38cfc5d6 100644 --- a/ops/model.py +++ b/ops/model.py @@ -57,7 +57,6 @@ from ops.jujuversion import JujuVersion # a k8s spec is a mapping from names/"types" to json/yaml spec objects -# public since it is used in ops.testing K8sSpec = Mapping[str, Any] if typing.TYPE_CHECKING: From f6d64695a849665a0c8714377c3eb5ee9f4bec0e Mon Sep 17 00:00:00 2001 From: Ben Hoyt Date: Tue, 13 Jun 2023 14:03:37 +1200 Subject: [PATCH 5/5] Make exec stdin/stdout/stderr arguments correctly generic too --- ops/model.py | 18 +++++++++--------- ops/pebble.py | 19 ++++++++++--------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/ops/model.py b/ops/model.py index 3df94d295..51d03883c 100644 --- a/ops/model.py +++ b/ops/model.py @@ -2333,9 +2333,9 @@ def exec( # noqa user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None, - stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None, - stdout: Optional[Union[TextIO, BinaryIO]] = None, - stderr: Optional[Union[TextIO, BinaryIO]] = None, + stdin: Optional[Union[str, TextIO]] = None, + stdout: Optional[TextIO] = None, + stderr: Optional[TextIO] = None, encoding: str = 'utf-8', combine_stderr: bool = False ) -> pebble.ExecProcess[str]: @@ -2354,9 +2354,9 @@ def exec( # noqa user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None, - stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None, - stdout: Optional[Union[TextIO, BinaryIO]] = None, - stderr: Optional[Union[TextIO, BinaryIO]] = None, + stdin: Optional[Union[bytes, BinaryIO]] = None, + stdout: Optional[BinaryIO] = None, + stderr: Optional[BinaryIO] = None, encoding: None = None, combine_stderr: bool = False ) -> pebble.ExecProcess[bytes]: @@ -2393,9 +2393,9 @@ def exec( user=user, group_id=group_id, group=group, - stdin=stdin, - stdout=stdout, - stderr=stderr, + stdin=stdin, # type: ignore + stdout=stdout, # type: ignore + stderr=stderr, # type: ignore encoding=encoding, # type: ignore combine_stderr=combine_stderr, ) diff --git a/ops/pebble.py b/ops/pebble.py index 5acf7efc2..21ad199b1 100644 --- a/ops/pebble.py +++ b/ops/pebble.py @@ -42,6 +42,7 @@ import urllib.request import warnings from typing import ( + IO, TYPE_CHECKING, Any, AnyStr, @@ -1129,9 +1130,9 @@ class ExecProcess(Generic[AnyStr]): def __init__( self, - stdin: Optional[typing.IO[AnyStr]], - stdout: Optional[typing.IO[AnyStr]], - stderr: Optional[typing.IO[AnyStr]], + stdin: Optional[IO[AnyStr]], + stdout: Optional[IO[AnyStr]], + stderr: Optional[IO[AnyStr]], client: 'Client', timeout: Optional[float], control_ws: '_WebSocket', @@ -2057,9 +2058,9 @@ def exec( # noqa user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None, - stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None, - stdout: Optional[Union[TextIO, BinaryIO]] = None, - stderr: Optional[Union[TextIO, BinaryIO]] = None, + stdin: Optional[Union[str, TextIO]] = None, + stdout: Optional[TextIO] = None, + stderr: Optional[TextIO] = None, encoding: str = 'utf-8', combine_stderr: bool = False ) -> ExecProcess[str]: @@ -2078,9 +2079,9 @@ def exec( # noqa user: Optional[str] = None, group_id: Optional[int] = None, group: Optional[str] = None, - stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None, - stdout: Optional[Union[TextIO, BinaryIO]] = None, - stderr: Optional[Union[TextIO, BinaryIO]] = None, + stdin: Optional[Union[bytes, BinaryIO]] = None, + stdout: Optional[BinaryIO] = None, + stderr: Optional[BinaryIO] = None, encoding: None = None, combine_stderr: bool = False ) -> ExecProcess[bytes]: