Skip to content

Commit

Permalink
Add public graph() to get Graphviz graph
Browse files Browse the repository at this point in the history
  • Loading branch information
maddenp committed Feb 14, 2024
1 parent e8dd4bf commit 31bb296
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 143 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Several public helper callables are available in the `iotaa` module:

- `asset()` instantiates an asset to return from a task function.
- `dryrun()` activates dry-run mode.
- `graph()` returns a Graphviz representation of the most recent task execution tree.
- `logcfg()` configures Python's root logger to support `logging.info()` (et al.) calls, which `iotaa` itself makes. It is called by the `iotaa` CLI, but is available for standalone applications with simple logging needs to call programmatically.
- `logset()` accepts a Python `Logger` object and configures `iotaa` to send all future log messages to it.
- `main()` is the entry-point function for CLI use.
Expand Down
2 changes: 1 addition & 1 deletion recipe/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
],
"run": []
},
"version": "0.7.0"
"version": "0.7.1"
}
2 changes: 1 addition & 1 deletion recipe/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package:
name: iotaa
version: 0.7.0
version: 0.7.1
source:
path: ../src
build:
Expand Down
288 changes: 153 additions & 135 deletions src/iotaa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
iotaa.
"""

from __future__ import annotations

import logging
import re
import sys
Expand All @@ -18,7 +20,7 @@
from types import SimpleNamespace as ns
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union

# Public API
# Public return-value classes:


@dataclass
Expand Down Expand Up @@ -47,11 +49,150 @@ class Result:
success: bool


# Types:

_AssetsT = Union[Dict[str, Asset], List[Asset]]
_AssetT = Optional[Union[_AssetsT, Asset]]

_TaskT = Callable[..., _AssetT]

# Private helper classes and their instances:


class _Graph:
"""
Graphviz digraph support.
"""

def __init__(self) -> None:
self.reset()

def __repr__(self) -> str:
"""
Returns the task/asset graph in Graphviz dot format.
"""
f = (
lambda name, shape, ready=None: '%s [fillcolor=%s, label="%s", shape=%s, style=filled]'
% (
self.name(name),
self.color[ready],
name,
shape,
)
)
edges = ["%s -> %s" % (self.name(a), self.name(b)) for a, b in self.edges]
nodes_a = [f(ref, self.shape.asset, ready()) for ref, ready in self.assets.items()]
nodes_t = [f(x, self.shape.task) for x in self.tasks]
return "digraph g {\n %s\n}" % "\n ".join(sorted(nodes_t + nodes_a + edges))

@property
def color(self) -> Dict[Any, str]:
"""
Graphviz colors.
"""
return defaultdict(lambda: "grey", [(True, "palegreen"), (False, "orange")])

def name(self, name: str) -> str:
"""
Convert an iotaa asset/task name to a Graphviz-appropriate node name.
:param name: An iotaa asset/task name.
:return: A Graphviz-appropriate node name.
"""
return "_%s" % md5(str(name).encode("utf-8")).hexdigest()

@property
def shape(self) -> ns:
"""
Graphviz shapes.
"""
return ns(asset="box", task="ellipse")

def reset(self) -> None:
"""
Reset graph state.
"""
self.assets: dict = {}
self.edges: set = set()
self.tasks: set = set()

def update_from_requirements(self, taskname: str, alist: List[Asset]) -> None:
"""
Update graph data structures with required-task info.
:param taskname: The current task's name.
:param alist: Flattened required-task assets.
"""
asset_taskname = lambda a: getattr(a, "taskname", None)
self.assets.update({a.ref: a.ready for a in alist})
self.edges |= set((asset_taskname(a), a.ref) for a in alist)
self.edges |= set((taskname, asset_taskname(a)) for a in alist)
self.tasks |= set(asset_taskname(a) for a in alist)
self.tasks.add(taskname)

def update_from_task(self, taskname: str, assets: _AssetT) -> None:
"""
Update graph data structures with current task info.
:param taskname: The current task's name.
:param assets: A collection of assets, one asset, or None.
"""
alist = _listify(assets)
self.assets.update({a.ref: a.ready for a in alist})
self.edges |= set((taskname, a.ref) for a in alist)
self.tasks.add(taskname)


_graph = _Graph()


class _Logger:
"""
Support for swappable loggers.
"""

def __init__(self) -> None:
self.logger = logging.getLogger() # default to Python root logger.

def __getattr__(self, attr: str) -> Any:
"""
Delegate attribute access to the currently-used logger.
:param attr: The attribute to access.
:returns: The requested attribute.
"""
return getattr(self.logger, attr)


_log = _Logger()


class _State:
"""
Global iotaa state.
"""

def __init__(self) -> None:
self.dry_run = False
self.initialized = False
self.reset()

def initialize(self) -> None:
"""
Mark iotaa as initialized.
"""
self.initialized = True

def reset(self) -> None:
"""
Reset state.
"""
self.initialized = False


_state = _State()

# Public API functions:


def asset(ref: Any, ready: Callable[..., bool]) -> Asset:
"""
Expand All @@ -71,6 +212,13 @@ def dryrun(enable: bool = True) -> None:
_state.dry_run = enable


def graph() -> str:
"""
Returns the Graphivz graph of the most recent task execution tree.
"""
return str(_graph)


def logcfg(verbose: bool = False) -> None:
"""
Configure default logging.
Expand Down Expand Up @@ -120,7 +268,7 @@ def main() -> None:
reified = [_reify(arg) for arg in args.args]
getattr(modobj, args.function)(*reified)
if args.graph:
_graph.emit()
print(_graph)


def refs(assets: _AssetT) -> Any:
Expand Down Expand Up @@ -235,7 +383,7 @@ def tasknames(obj: object) -> List[str]:
return sorted(name for name in dir(obj) if f(getattr(obj, name)))


# Decorators
# Public task-graph decorator functions:


def external(f: Callable) -> _TaskT:
Expand Down Expand Up @@ -312,7 +460,7 @@ def __iotaa_tasks__(*args, **kwargs) -> _AssetT:
return _set_metadata(f, __iotaa_tasks__)


# Private functions and classes
# Private helper functions:


def _delegate(g: Generator, taskname: str) -> List[Asset]:
Expand Down Expand Up @@ -500,133 +648,3 @@ def _task_initial(f: Callable, *args, **kwargs) -> Tuple[str, bool, Generator]:
g = f(*args, **kwargs)
taskname = next(g)
return taskname, top, g


class _Graph:
"""
Graphviz digraph support.
"""

def __init__(self) -> None:
self.reset()

@property
def color(self) -> Dict[Any, str]:
"""
Graphviz colors.
"""
return defaultdict(lambda: "grey", [(True, "palegreen"), (False, "orange")])

def emit(self) -> None:
"""
Emit a task/asset graph in Graphviz dot format.
"""
f = (
lambda name, shape, ready=None: '%s [fillcolor=%s, label="%s", shape=%s, style=filled]'
% (
self.name(name),
self.color[ready],
name,
shape,
)
)
edges = ["%s -> %s" % (self.name(a), self.name(b)) for a, b in self.edges]
nodes_a = [f(ref, self.shape.asset, ready()) for ref, ready in self.assets.items()]
nodes_t = [f(x, self.shape.task) for x in self.tasks]
print("digraph g {\n %s\n}" % "\n ".join(sorted(nodes_t + nodes_a + edges)))

def name(self, name: str) -> str:
"""
Convert an iotaa asset/task name to a Graphviz-appropriate node name.
:param name: An iotaa asset/task name.
:return: A Graphviz-appropriate node name.
"""
return "_%s" % md5(str(name).encode("utf-8")).hexdigest()

@property
def shape(self) -> ns:
"""
Graphviz shapes.
"""
return ns(asset="box", task="ellipse")

def reset(self) -> None:
"""
Reset graph state.
"""
self.assets: dict = {}
self.edges: set = set()
self.tasks: set = set()

def update_from_requirements(self, taskname: str, alist: List[Asset]) -> None:
"""
Update graph data structures with required-task info.
:param taskname: The current task's name.
:param alist: Flattened required-task assets.
"""
asset_taskname = lambda a: getattr(a, "taskname", None)
self.assets.update({a.ref: a.ready for a in alist})
self.edges |= set((asset_taskname(a), a.ref) for a in alist)
self.edges |= set((taskname, asset_taskname(a)) for a in alist)
self.tasks |= set(asset_taskname(a) for a in alist)
self.tasks.add(taskname)

def update_from_task(self, taskname: str, assets: _AssetT) -> None:
"""
Update graph data structures with current task info.
:param taskname: The current task's name.
:param assets: A collection of assets, one asset, or None.
"""
alist = _listify(assets)
self.assets.update({a.ref: a.ready for a in alist})
self.edges |= set((taskname, a.ref) for a in alist)
self.tasks.add(taskname)


class _Logger:
"""
Support for swappable loggers.
"""

def __init__(self) -> None:
self.logger = logging.getLogger() # default to Python root logger.

def __getattr__(self, attr: str) -> Any:
"""
Delegate attribute access to the currently-used logger.
:param attr: The attribute to access.
:returns: The requested attribute.
"""
return getattr(self.logger, attr)


class _State:
"""
Global iotaa state.
"""

def __init__(self) -> None:
self.dry_run = False
self.initialized = False
self.reset()

def initialize(self) -> None:
"""
Mark iotaa as initialized.
"""
self.initialized = True

def reset(self) -> None:
"""
Reset state.
"""
self.initialized = False


_graph = _Graph()
_log = _Logger()
_state = _State()
Loading

0 comments on commit 31bb296

Please sign in to comment.