Skip to content

Commit

Permalink
Introduce flyte Decks into flytekit (flyteorg#859)
Browse files Browse the repository at this point in the history
* Introduce flyte Decks into flytekit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated deck interface

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated deck interface

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated dependency

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated comment

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Deck plugin

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Refactor deck interface

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Lint fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Addressed comment

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed plugin tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed spark plugin tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Tests fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Add deck plugin to ci

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* More tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fix spark tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fix lint

Signed-off-by: Kevin Su <pingsutw@apache.org>

* address comments

Signed-off-by: Kevin Su <pingsutw@apache.org>

* address comments

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests error

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fixed tests error

Signed-off-by: Kevin Su <pingsutw@apache.org>

* add example and update type hint

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Mike Zhong <mzhong@embarkvet.com>
  • Loading branch information
pingsutw authored and Mike Zhong committed Apr 11, 2022
1 parent 653f96e commit 3f0edd9
Show file tree
Hide file tree
Showing 33 changed files with 890 additions and 22 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ jobs:
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-data-fsspec
- flytekit-deck-standard
- flytekit-dolt
- flytekit-greatexpectations
- flytekit-hive
Expand Down
1 change: 1 addition & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
from flytekit.core.task import Secret, reference_task, task
from flytekit.core.workflow import ImperativeWorkflow as Workflow
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
from flytekit.deck import Deck
from flytekit.extras.persistence import GCSPersistence, HttpPersistence, S3Persistence
from flytekit.loggers import logger
from flytekit.models.common import Annotations, AuthRole, Labels
Expand Down
5 changes: 5 additions & 0 deletions flytekit/configuration/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def get_specified_images(cfg: ConfigFile) -> typing.Dict[str, str]:
return images


class Deck(object):
SECTION = "deck"
DISABLE_DECK = ConfigEntry(LegacyConfigEntry(SECTION, "disable_deck", bool))


class AWS(object):
SECTION = "aws"
S3_ENDPOINT = ConfigEntry(LegacyConfigEntry(SECTION, "endpoint"), YamlConfigEntry("storage.connection.endpoint"))
Expand Down
34 changes: 32 additions & 2 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Any, Dict, Generic, List, Optional, OrderedDict, Tuple, Type, TypeVar, Union

from flytekit.configuration import SerializationSettings
from flytekit.configuration import internal as _internal
from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager, FlyteEntities
from flytekit.core.interface import Interface, transform_interface_to_typed_interface
from flytekit.core.local_cache import LocalTaskCache
Expand All @@ -38,6 +39,7 @@
)
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine
from flytekit.deck.deck import Deck
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -228,7 +230,6 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
# Promises as essentially inputs from previous task executions
# native constants are just bound to this specific task (default values for a task input)
# Also along with promises and constants, there could be dictionary or list of promises or constants

kwargs = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
Expand Down Expand Up @@ -364,6 +365,7 @@ def __init__(
task_config: T,
interface: Optional[Interface] = None,
environment: Optional[Dict[str, str]] = None,
disable_deck: bool = False,
**kwargs,
):
"""
Expand All @@ -377,6 +379,7 @@ def __init__(
signature of the task
environment (Optional[Dict[str, str]]): Any environment variables that should be supplied during the
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): If true, this task will not output deck html file
"""
super().__init__(
task_type=task_type,
Expand All @@ -387,6 +390,7 @@ def __init__(
self._python_interface = interface if interface else Interface()
self._environment = environment if environment else {}
self._task_config = task_config
self._disable_deck = disable_deck

# TODO lets call this interface and the other as flyte_interface?
@property
Expand Down Expand Up @@ -457,10 +461,13 @@ def dispatch_execute(

# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)
from flytekit.deck.deck import _output_deck

new_user_params._decks = [ctx.user_space_params.default_deck]
# Create another execution context with the new user params, but let's keep the same working dir
with FlyteContextManager.with_context(
ctx.with_execution_state(ctx.execution_state.with_params(user_space_params=new_user_params)) # type: ignore
ctx.with_execution_state(ctx.execution_state.with_params(user_space_params=new_user_params))
# type: ignore
) as exec_ctx:
# TODO We could support default values here too - but not part of the plan right now
# Translate the input literals to Python native
Expand Down Expand Up @@ -521,6 +528,22 @@ def dispatch_execute(
f"Failed to convert return value for var {k} for function {self.name} with error {type(e)}: {e}"
) from e

INPUT = "input"
OUTPUT = "output"

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

new_user_params.decks.append(input_deck)
new_user_params.decks.append(output_deck)

if _internal.Deck.DISABLE_DECK.read() is not True and self.disable_deck is False:
_output_deck(self.name.split(".")[-1], new_user_params)
outputs_literal_map = _literal_models.LiteralMap(literals=literals)
# After the execute has been successfully completed
return outputs_literal_map
Expand Down Expand Up @@ -561,6 +584,13 @@ def environment(self) -> Dict[str, str]:
"""
return self._environment

@property
def disable_deck(self) -> bool:
"""
If true, this task will not output deck html file
"""
return self._disable_deck


class TaskResolverMixin(object):
"""
Expand Down
36 changes: 34 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from enum import Enum
from typing import Generator, List, Optional, Union

import flytekit
from flytekit.clients import friendly as friendly_client # noqa
from flytekit.configuration import Config, SecretsConfig, SerializationSettings
from flytekit.core import mock_stats, utils
Expand All @@ -40,6 +41,8 @@
# TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin

# Enables static type checking https://docs.python.org/3/library/typing.html#typing.TYPE_CHECKING


if typing.TYPE_CHECKING:
from flytekit.core.base_task import TaskResolverMixin

Expand Down Expand Up @@ -79,6 +82,7 @@ class Builder(object):
attrs: typing.Dict[str, typing.Any]
working_dir: typing.Union[os.PathLike, utils.AutoDeletingTempDir]
checkpoint: typing.Optional[Checkpoint]
decks: List[flytekit.Deck]
raw_output_prefix: str

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
Expand All @@ -88,6 +92,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.execution_id = current.execution_id if current else None
self.logging = current.logging if current else None
self.checkpoint = current._checkpoint if current else None
self.decks = current._decks if current else []
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None

Expand All @@ -105,6 +110,7 @@ def build(self) -> ExecutionParameters:
execution_id=self.execution_id,
logging=self.logging,
checkpoint=self.checkpoint,
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
**self.attrs,
)
Expand All @@ -131,17 +137,28 @@ def builder(self) -> Builder:
return ExecutionParameters.Builder(current=self)

def __init__(
self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_prefix, checkpoint=None, **kwargs
self,
execution_date,
tmp_dir,
stats,
execution_id,
logging,
raw_output_prefix,
checkpoint=None,
decks=None,
**kwargs,
):
"""
Args:
execution_date: Date when the execution is running
tmp_dir: temporary directory for the execution
stats: handle to emit stats
execution_id: Identifier for the xecution
execution_id: Identifier for the execution
logging: handle to logging
checkpoint: Checkpoint Handle to the configured checkpoint system
"""
if decks is None:
decks = []
self._stats = stats
self._execution_date = execution_date
self._working_directory = tmp_dir
Expand All @@ -153,6 +170,7 @@ def __init__(
# It is safe to recreate the Secrets Manager
self._secrets_manager = SecretsManager()
self._checkpoint = checkpoint
self._decks = decks

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -220,6 +238,19 @@ def checkpoint(self) -> Checkpoint:
raise NotImplementedError("Checkpointing is not available, please check the version of the platform.")
return self._checkpoint

@property
def decks(self) -> typing.List:
"""
A list of decks of the tasks, and it will be rendered to a html at the end of the task execution.
"""
return self._decks

@property
def default_deck(self) -> "Deck":
from flytekit import Deck

return Deck("default")

def __getattr__(self, attr_name: str) -> typing.Any:
"""
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc
Expand Down Expand Up @@ -760,6 +791,7 @@ def initialize():
logging=user_space_logger,
tmp_dir=user_space_path,
raw_output_prefix=default_context.file_access._raw_output_prefix,
decks=[],
)

default_context = default_context.with_execution_state(
Expand Down
3 changes: 3 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def task(
secret_requests: Optional[List[Secret]] = None,
execution_mode: Optional[PythonFunctionTask.ExecutionBehavior] = PythonFunctionTask.ExecutionBehavior.DEFAULT,
task_resolver: Optional[TaskResolverMixin] = None,
disable_deck: bool = False,
) -> Union[Callable, PythonFunctionTask]:
"""
This is the core decorator to use for any task type in flytekit.
Expand Down Expand Up @@ -177,6 +178,7 @@ def foo2():
may change based on the backend provider.
:param execution_mode: This is mainly for internal use. Please ignore. It is filled in automatically.
:param task_resolver: Provide a custom task resolver.
:param disable_deck: If true, this task will not output deck html file
"""

def wrapper(fn) -> PythonFunctionTask:
Expand All @@ -201,6 +203,7 @@ def wrapper(fn) -> PythonFunctionTask:
secret_requests=secret_requests,
execution_mode=execution_mode,
task_resolver=task_resolver,
disable_deck=disable_deck,
)
update_wrapper(task_instance, fn)
return task_instance
Expand Down
19 changes: 19 additions & 0 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
f"Conversion to python value expected type {expected_python_type} from literal not implemented"
)

@abstractmethod
def to_html(self, ctx: FlyteContext, python_val: T, expected_python_type: Type[T]) -> str:
"""
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
"""
return str(python_val)

def __repr__(self):
return f"{self._name} Transforms ({self._t}) to Flyte native"

Expand Down Expand Up @@ -692,6 +699,18 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T
transformer = cls.get_transformer(expected_python_type)
return transformer.to_python_value(ctx, lv, expected_python_type)

@classmethod
def to_html(cls, ctx: FlyteContext, python_val: typing.Any, expected_python_type: Type[T]) -> str:
transformer = cls.get_transformer(expected_python_type)
if get_origin(expected_python_type) is Annotated:
expected_python_type, *annotate_args = get_args(expected_python_type)
from flytekit.deck.renderer import Renderable

for arg in annotate_args:
if isinstance(arg, Renderable):
return arg.to_html(python_val)
return transformer.to_html(ctx, python_val, expected_python_type)

@classmethod
def named_tuple_to_variable_map(cls, t: typing.NamedTuple) -> _interface_models.VariableMap:
"""
Expand Down
2 changes: 2 additions & 0 deletions flytekit/deck/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .deck import Deck
from .renderer import TopFrameRenderer
100 changes: 100 additions & 0 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
from typing import Dict, Optional

from jinja2 import Environment, FileSystemLoader

from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Deck contains a list of renderers (FrameRenderer, MarkdownRenderer) that can
generate a html file. For example, FrameRenderer can render a DataFrame as an HTML table,
MarkdownRenderer can convert Markdown string to HTML
Flyte context saves a list of deck objects, and we use renderers in those decks to render
the data and create an HTML file when those tasks are executed
Each task has a least three decks (input, output, default). Input/output decks are
used to render tasks' input/output data, and the default deck is used to render line plots,
scatter plots or markdown text. In addition, users can create new decks to render
their data with custom renderers.
.. warning::
This feature is in beta.
.. code-block:: python
iris_df = px.data.iris()
@task()
def t1() -> str:
md_text = "#Hello Flyte\n##Hello Flyte\n###Hello Flyte"
m = MarkdownRenderer()
s = BoxRenderer("sepal_length")
deck = flytekit.Deck("demo", s.to_html(iris_df))
deck.append(m.to_html(md_text))
default_deck = flytekit.current_context().default_deck
default_deck.append(m.to_html(md_text))
return md_text
# Use Annotated to override default renderer
@task()
def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
self._name = name
# self.renderers = renderers if isinstance(renderers, list) else [renderers]
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
self._html = self._html + "\n" + html
return self

@property
def name(self) -> str:
return self._name

@property
def html(self) -> str:
return self._html


def _output_deck(task_name: str, new_user_params: ExecutionParameters):
deck_map: Dict[str, str] = {}
decks = new_user_params.decks
ctx = FlyteContext.current_context()

# TODO: upload deck file to remote filesystems (s3, gcs)
output_dir = ctx.file_access.get_random_local_directory()

for deck in decks:
_deck_to_html_file(deck, deck_map, output_dir)

root = os.path.dirname(os.path.abspath(__file__))
templates_dir = os.path.join(root, "html")
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template("template.html")

deck_path = os.path.join(output_dir, "deck.html")
with open(deck_path, "w") as f:
f.write(template.render(metadata=deck_map))

print(f"{task_name} output flytekit deck html to file://{deck_path}")


def _deck_to_html_file(deck: Deck, deck_map: Dict[str, str], output_dir: str):
file_name = deck.name + ".html"
path = os.path.join(output_dir, file_name)
with open(path, "w") as output:
deck_map[deck.name] = file_name
output.write(deck.html)
Empty file added flytekit/deck/html/__init__.py
Empty file.
Loading

0 comments on commit 3f0edd9

Please sign in to comment.