Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce flyte Decks into flytekit #859

Merged
merged 36 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4959b41
Introduce flyte Decks into flytekit
pingsutw Feb 17, 2022
4f2d48d
Updated deck interface
pingsutw Mar 3, 2022
6e669d3
Merge branch 'master' of github.com:flyteorg/flytekit into deck
pingsutw Mar 3, 2022
50a49e9
Updated deck interface
pingsutw Mar 3, 2022
10890f0
Updated dependency
pingsutw Mar 3, 2022
e8d98ba
Updated comment
pingsutw Mar 4, 2022
1eace7a
Fixed tests
pingsutw Mar 4, 2022
6c6712c
Fixed tests
pingsutw Mar 4, 2022
3890b78
Deck plugin
pingsutw Mar 8, 2022
f2fe059
Refactor deck interface
pingsutw Mar 17, 2022
2a82127
Merged master
pingsutw Mar 21, 2022
85d063f
Updated tests
pingsutw Mar 21, 2022
77b2b94
Updated tests
pingsutw Mar 21, 2022
6bf8ac1
Updated tests
pingsutw Mar 22, 2022
8a88f4c
Updated tests
pingsutw Mar 22, 2022
11319b9
Updated tests
pingsutw Mar 22, 2022
f5361d0
Updated tests
pingsutw Mar 22, 2022
dfbe437
Fixed tests
pingsutw Mar 22, 2022
2ac7936
Merged master
pingsutw Mar 22, 2022
1277718
Lint fixed
pingsutw Mar 22, 2022
31fd500
Addressed comment
pingsutw Mar 30, 2022
72144dc
Fixed tests
pingsutw Mar 30, 2022
98d5045
Fixed plugin tests
pingsutw Mar 30, 2022
736c935
Fixed spark plugin tests
pingsutw Mar 30, 2022
5b7c407
Merge branch 'master' of github.com:flyteorg/flytekit into deck
pingsutw Mar 31, 2022
9d5fb3e
Tests fixed
pingsutw Mar 31, 2022
9fc0f4b
Add deck plugin to ci
pingsutw Mar 31, 2022
163aa39
Fixed tests
pingsutw Mar 31, 2022
6150ddb
More tests
pingsutw Mar 31, 2022
cdc4bcf
Fix spark tests
pingsutw Apr 1, 2022
3373b2d
Fix lint
pingsutw Apr 1, 2022
cb545e9
address comments
pingsutw Apr 4, 2022
8a8f7c2
address comments
pingsutw Apr 4, 2022
8e94c2e
Fixed tests error
pingsutw Apr 4, 2022
987e058
Fixed tests error
pingsutw Apr 4, 2022
948f00c
add example and update type hint
pingsutw Apr 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ jobs:
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-data-fsspec
- flytekit-deck-standard
- flytekit-dolt
- flytekit-greatexpectations
- flytekit-hive
Expand Down
1 change: 1 addition & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
from flytekit.core.task import Secret, reference_task, task
from flytekit.core.workflow import ImperativeWorkflow as Workflow
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
from flytekit.deck import Deck
from flytekit.extras.persistence import GCSPersistence, HttpPersistence, S3Persistence
from flytekit.loggers import logger
from flytekit.models.common import Annotations, AuthRole, Labels
Expand Down
5 changes: 5 additions & 0 deletions flytekit/configuration/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def get_specified_images(cfg: ConfigFile) -> typing.Dict[str, str]:
return images


class Deck(object):
SECTION = "deck"
DISABLE_DECK = ConfigEntry(LegacyConfigEntry(SECTION, "disable_deck", bool))


class AWS(object):
SECTION = "aws"
S3_ENDPOINT = ConfigEntry(LegacyConfigEntry(SECTION, "endpoint"), YamlConfigEntry("storage.connection.endpoint"))
Expand Down
34 changes: 32 additions & 2 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Any, Dict, Generic, List, Optional, OrderedDict, Tuple, Type, TypeVar, Union

from flytekit.configuration import SerializationSettings
from flytekit.configuration import internal as _internal
from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager, FlyteEntities
from flytekit.core.interface import Interface, transform_interface_to_typed_interface
from flytekit.core.local_cache import LocalTaskCache
Expand All @@ -38,6 +39,7 @@
)
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine
from flytekit.deck.deck import Deck
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -228,7 +230,6 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
# Promises as essentially inputs from previous task executions
# native constants are just bound to this specific task (default values for a task input)
# Also along with promises and constants, there could be dictionary or list of promises or constants

kwargs = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
Expand Down Expand Up @@ -364,6 +365,7 @@ def __init__(
task_config: T,
interface: Optional[Interface] = None,
environment: Optional[Dict[str, str]] = None,
disable_deck: bool = False,
**kwargs,
):
"""
Expand All @@ -377,6 +379,7 @@ def __init__(
signature of the task
environment (Optional[Dict[str, str]]): Any environment variables that should be supplied during the
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): If true, this task will not output deck html file
"""
super().__init__(
task_type=task_type,
Expand All @@ -387,6 +390,7 @@ def __init__(
self._python_interface = interface if interface else Interface()
self._environment = environment if environment else {}
self._task_config = task_config
self._disable_deck = disable_deck

# TODO lets call this interface and the other as flyte_interface?
@property
Expand Down Expand Up @@ -457,10 +461,13 @@ def dispatch_execute(

# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)
from flytekit.deck.deck import _output_deck

new_user_params._decks = [ctx.user_space_params.default_deck]
# Create another execution context with the new user params, but let's keep the same working dir
with FlyteContextManager.with_context(
ctx.with_execution_state(ctx.execution_state.with_params(user_space_params=new_user_params)) # type: ignore
ctx.with_execution_state(ctx.execution_state.with_params(user_space_params=new_user_params))
# type: ignore
) as exec_ctx:
# TODO We could support default values here too - but not part of the plan right now
# Translate the input literals to Python native
Expand Down Expand Up @@ -521,6 +528,22 @@ def dispatch_execute(
f"Failed to convert return value for var {k} for function {self.name} with error {type(e)}: {e}"
) from e

INPUT = "input"
OUTPUT = "output"

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

new_user_params.decks.append(input_deck)
new_user_params.decks.append(output_deck)

if _internal.Deck.DISABLE_DECK.read() is not True and self.disable_deck is False:
_output_deck(self.name.split(".")[-1], new_user_params)
outputs_literal_map = _literal_models.LiteralMap(literals=literals)
# After the execute has been successfully completed
return outputs_literal_map
Expand Down Expand Up @@ -561,6 +584,13 @@ def environment(self) -> Dict[str, str]:
"""
return self._environment

@property
def disable_deck(self) -> bool:
"""
If true, this task will not output deck html file
"""
return self._disable_deck


class TaskResolverMixin(object):
"""
Expand Down
35 changes: 33 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
# TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin

# Enables static type checking https://docs.python.org/3/library/typing.html#typing.TYPE_CHECKING


if typing.TYPE_CHECKING:
from flytekit.core.base_task import TaskResolverMixin

Expand Down Expand Up @@ -79,6 +81,7 @@ class Builder(object):
attrs: typing.Dict[str, typing.Any]
working_dir: typing.Union[os.PathLike, utils.AutoDeletingTempDir]
checkpoint: typing.Optional[Checkpoint]
decks: List
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list of?

raw_output_prefix: str

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
Expand All @@ -88,6 +91,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.execution_id = current.execution_id if current else None
self.logging = current.logging if current else None
self.checkpoint = current._checkpoint if current else None
self.decks = current._decks if current else []
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None

Expand All @@ -105,6 +109,7 @@ def build(self) -> ExecutionParameters:
execution_id=self.execution_id,
logging=self.logging,
checkpoint=self.checkpoint,
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
**self.attrs,
)
Expand All @@ -131,17 +136,28 @@ def builder(self) -> Builder:
return ExecutionParameters.Builder(current=self)

def __init__(
self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_prefix, checkpoint=None, **kwargs
self,
execution_date,
tmp_dir,
stats,
execution_id,
logging,
raw_output_prefix,
checkpoint=None,
decks=None,
**kwargs,
):
"""
Args:
execution_date: Date when the execution is running
tmp_dir: temporary directory for the execution
stats: handle to emit stats
execution_id: Identifier for the xecution
execution_id: Identifier for the execution
logging: handle to logging
checkpoint: Checkpoint Handle to the configured checkpoint system
"""
if decks is None:
decks = []
self._stats = stats
self._execution_date = execution_date
self._working_directory = tmp_dir
Expand All @@ -153,6 +169,7 @@ def __init__(
# It is safe to recreate the Secrets Manager
self._secrets_manager = SecretsManager()
self._checkpoint = checkpoint
self._decks = decks

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -220,6 +237,19 @@ def checkpoint(self) -> Checkpoint:
raise NotImplementedError("Checkpointing is not available, please check the version of the platform.")
return self._checkpoint

@property
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
def decks(self) -> typing.List:
"""
A list of decks of the tasks, and it will be rendered to a html at the end of the task execution.
"""
return self._decks

@property
def default_deck(self) -> "Deck":
from flytekit import Deck

return Deck("default")

def __getattr__(self, attr_name: str) -> typing.Any:
"""
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc
Expand Down Expand Up @@ -760,6 +790,7 @@ def initialize():
logging=user_space_logger,
tmp_dir=user_space_path,
raw_output_prefix=default_context.file_access._raw_output_prefix,
decks=[],
)

default_context = default_context.with_execution_state(
Expand Down
3 changes: 3 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def task(
secret_requests: Optional[List[Secret]] = None,
execution_mode: Optional[PythonFunctionTask.ExecutionBehavior] = PythonFunctionTask.ExecutionBehavior.DEFAULT,
task_resolver: Optional[TaskResolverMixin] = None,
disable_deck: bool = False,
) -> Union[Callable, PythonFunctionTask]:
"""
This is the core decorator to use for any task type in flytekit.
Expand Down Expand Up @@ -177,6 +178,7 @@ def foo2():
may change based on the backend provider.
:param execution_mode: This is mainly for internal use. Please ignore. It is filled in automatically.
:param task_resolver: Provide a custom task resolver.
:param disable_deck: If true, this task will not output deck html file
"""

def wrapper(fn) -> PythonFunctionTask:
Expand All @@ -201,6 +203,7 @@ def wrapper(fn) -> PythonFunctionTask:
secret_requests=secret_requests,
execution_mode=execution_mode,
task_resolver=task_resolver,
disable_deck=disable_deck,
)
update_wrapper(task_instance, fn)
return task_instance
Expand Down
19 changes: 19 additions & 0 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
f"Conversion to python value expected type {expected_python_type} from literal not implemented"
)

@abstractmethod
def to_html(self, ctx: FlyteContext, python_val: T, expected_python_type: Type[T]) -> str:
"""
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
"""
return str(python_val)

def __repr__(self):
return f"{self._name} Transforms ({self._t}) to Flyte native"

Expand Down Expand Up @@ -692,6 +699,18 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T
transformer = cls.get_transformer(expected_python_type)
return transformer.to_python_value(ctx, lv, expected_python_type)

@classmethod
def to_html(cls, ctx: FlyteContext, python_val: typing.Any, expected_python_type: Type[T]) -> str:
transformer = cls.get_transformer(expected_python_type)
if get_origin(expected_python_type) is Annotated:
expected_python_type, *annotate_args = get_args(expected_python_type)
from flytekit.deck.renderer import Renderable

for arg in annotate_args:
if isinstance(arg, Renderable):
return arg.to_html(python_val)
return transformer.to_html(ctx, python_val, expected_python_type)

@classmethod
def named_tuple_to_variable_map(cls, t: typing.NamedTuple) -> _interface_models.VariableMap:
"""
Expand Down
2 changes: 2 additions & 0 deletions flytekit/deck/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .deck import Deck
from .renderer import TopFrameRenderer
79 changes: 79 additions & 0 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from typing import Dict, Optional

from jinja2 import Environment, FileSystemLoader

from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.

Deck contains a list of renderers (FrameRenderer, MarkdownRenderer) that can
generate a html file. For example, FrameRenderer can render a DataFrame as an HTML table,
MarkdownRenderer can convert Markdown string to HTML

Flyte context saves a list of deck objects, and we use renderers in those decks to render
the data and create an HTML file when those tasks are executed

Each task has a least three decks (input, output, default). Input/output decks are
used to render tasks' input/output data, and the default deck is used to render line plots,
scatter plots or markdown text. In addition, users can create new decks to render
their data with custom renderers.

.. warning::

This feature is in beta.

"""

def __init__(self, name: str, html: Optional[str] = ""):
self._name = name
# self.renderers = renderers if isinstance(renderers, list) else [renderers]
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
self._html = self._html + "\n" + html
return self

@property
def name(self) -> str:
return self._name

@property
def html(self) -> str:
return self._html


def _output_deck(task_name: str, new_user_params: ExecutionParameters):
deck_map: Dict[str, str] = {}
decks = new_user_params.decks
ctx = FlyteContext.current_context()

# TODO: upload deck file to remote filesystems (s3, gcs)
output_dir = ctx.file_access.get_random_local_directory()

for deck in decks:
_deck_to_html_file(deck, deck_map, output_dir)

root = os.path.dirname(os.path.abspath(__file__))
templates_dir = os.path.join(root, "html")
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template("template.html")

deck_path = os.path.join(output_dir, "deck.html")
with open(deck_path, "w") as f:
f.write(template.render(metadata=deck_map))

print(f"{task_name} output flytekit deck html to file://{deck_path}")


def _deck_to_html_file(deck: Deck, deck_map: Dict[str, str], output_dir: str):
file_name = deck.name + ".html"
path = os.path.join(output_dir, file_name)
with open(path, "w") as output:
deck_map[deck.name] = file_name
output.write(deck.html)
Empty file.
Loading