Skip to content

Commit

Permalink
Feature/3506: Improve Type Conversion errors, use rich to prettify er…
Browse files Browse the repository at this point in the history
…ror messages (#1582)

* improve input type conversion error

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* fix lint

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* fix

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* add tests

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* add tests

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* add rich

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* fix lint

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* remove prototyping script, update loggers

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* update __init__.py

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* update logger

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* update logger

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* fix GE and pandera tests

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* fix lint

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

---------

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
  • Loading branch information
cosmicBboy authored Apr 17, 2023
1 parent 6e4d34c commit e865db5
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 74 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ htmlcov
*.ipynb
*dat
docs/source/_tags/
.hypothesis
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

git+https://github.com/flyteorg/pytest-flyte@main#egg=pytest-flyte
coverage[toml]
hypothesis
joblib
mock
pytest
Expand Down
5 changes: 5 additions & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@
import sys
from typing import Generator

from rich import traceback

if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
Expand Down Expand Up @@ -297,3 +299,6 @@ def load_implicit_plugins():

# Load all implicit plugins
load_implicit_plugins()

# Pretty-print exception messages
traceback.install(width=None, extra_lines=0)
41 changes: 27 additions & 14 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
translate_inputs_to_literals,
)
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.deck.deck import Deck
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
Expand Down Expand Up @@ -245,12 +245,17 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
# Promises as essentially inputs from previous task executions
# native constants are just bound to this specific task (default values for a task input)
# Also along with promises and constants, there could be dictionary or list of promises or constants
kwargs = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
flyte_interface_types=self.interface.inputs,
native_types=self.get_input_types(), # type: ignore
)
try:
kwargs = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
flyte_interface_types=self.interface.inputs,
native_types=self.get_input_types(), # type: ignore
)
except TypeTransformerFailedError as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise TypeError(msg) from exc
input_literal_map = _literal_models.LiteralMap(literals=kwargs)

# if metadata.cache is set, check memoized version
Expand Down Expand Up @@ -515,7 +520,14 @@ def dispatch_execute(
) as exec_ctx:
# TODO We could support default values here too - but not part of the plan right now
# Translate the input literals to Python native
native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
try:
native_inputs = TypeEngine.literal_map_to_kwargs(
exec_ctx, input_literal_map, self.python_interface.inputs
)
except Exception as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise type(exc)(msg) from exc

# TODO: Logger should auto inject the current context information to indicate if the task is running within
# a workflow or a subworkflow etc
Expand Down Expand Up @@ -559,19 +571,20 @@ def dispatch_execute(
# We manually construct a LiteralMap here because task inputs and outputs actually violate the assumption
# built into the IDL that all the values of a literal map are of the same type.
literals = {}
for k, v in native_outputs_as_map.items():
for i, (k, v) in enumerate(native_outputs_as_map.items()):
literal_type = self._outputs_interface[k].type
py_type = self.get_type_for_output_var(k, v)

if isinstance(v, tuple):
raise TypeError(f"Output({k}) in task{self.name} received a tuple {v}, instead of {py_type}")
raise TypeError(f"Output({k}) in task '{self.name}' received a tuple {v}, instead of {py_type}")
try:
literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
except Exception as e:
logger.error(f"Failed to convert return value for var {k} with error {type(e)}: {e}")
raise TypeError(
f"Failed to convert return value for var {k} for function {self.name} with error {type(e)}: {e}"
) from e
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != f"o{i}" else i
msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}"
logger.error(msg)
raise TypeError(msg) from e

if self._disable_deck is False:
INPUT = "input"
Expand Down
21 changes: 15 additions & 6 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from flytekit.core.interface import Interface
from flytekit.core.node import Node
from flytekit.core.type_engine import DictTransformer, ListTransformer, TypeEngine
from flytekit.core.type_engine import DictTransformer, ListTransformer, TypeEngine, TypeTransformerFailedError
from flytekit.exceptions import user as _user_exceptions
from flytekit.models import interface as _interface_models
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -147,7 +147,10 @@ def extract_value(
raise ValueError(f"Received unexpected keyword argument {k}")
var = flyte_interface_types[k]
t = native_types[k]
result[k] = extract_value(ctx, v, t, var.type)
try:
result[k] = extract_value(ctx, v, t, var.type)
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from exc

return result

Expand Down Expand Up @@ -483,10 +486,14 @@ def create_native_named_tuple(

if isinstance(promises, Promise):
k, v = [(k, v) for k, v in entity_interface.outputs.items()][0] # get output native type
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != "o0" else 0
try:
return TypeEngine.to_python_value(ctx, promises.val, v)
except Exception as e:
raise AssertionError(f"Failed to convert value of output {k}, expected type {v}.") from e
raise TypeError(
f"Failed to convert output in position {key} of value {promises.val}, expected type {v}."
) from e

if len(cast(Tuple[Promise], promises)) == 0:
return None
Expand All @@ -496,7 +503,7 @@ def create_native_named_tuple(
named_tuple_name = entity_interface.output_tuple_name

outputs = {}
for p in cast(Tuple[Promise], promises):
for i, p in enumerate(cast(Tuple[Promise], promises)):
if not isinstance(p, Promise):
raise AssertionError(
"Workflow outputs can only be promises that are returned by tasks. Found a value of"
Expand All @@ -506,7 +513,9 @@ def create_native_named_tuple(
try:
outputs[p.var] = TypeEngine.to_python_value(ctx, p.val, t)
except Exception as e:
raise AssertionError(f"Failed to convert value of output {p.var}, expected type {t}.") from e
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = p.var if p.var != f"o{i}" else i
raise TypeError(f"Failed to convert output in position {key} of value {p.val}, expected type {t}.") from e

# Should this class be part of the Interface?
nt = collections.namedtuple(named_tuple_name, list(outputs.keys())) # type: ignore
Expand Down Expand Up @@ -1063,7 +1072,7 @@ def flyte_entity_call_handler(
for k, v in kwargs.items():
if k not in cast(SupportsNodeCreation, entity).python_interface.inputs:
raise ValueError(
f"Received unexpected keyword argument {k} in function {cast(SupportsNodeCreation, entity).name}"
f"Received unexpected keyword argument '{k}' in function '{cast(SupportsNodeCreation, entity).name}'"
)

ctx = FlyteContextManager.current_context()
Expand Down
16 changes: 12 additions & 4 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def type_assertions_enabled(self) -> bool:

def assert_type(self, t: Type[T], v: T):
if not hasattr(t, "__origin__") and not isinstance(v, t):
raise TypeTransformerFailedError(f"Type of Val '{v}' is not an instance of {t}")
raise TypeTransformerFailedError(f"Expected value of type {t} but got '{v}' of type {type(v)}")

@abstractmethod
def get_literal_type(self, t: Type[T]) -> LiteralType:
Expand Down Expand Up @@ -166,7 +166,9 @@ def get_literal_type(self, t: Optional[Type[T]] = None) -> LiteralType:

def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
if type(python_val) != self._type:
raise TypeTransformerFailedError(f"Expected value of type {self._type} but got type {type(python_val)}")
raise TypeTransformerFailedError(
f"Expected value of type {self._type} but got '{python_val}' of type {type(python_val)}"
)
return self._to_literal_transformer(python_val)

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
Expand All @@ -185,7 +187,7 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
return res
except AttributeError:
# Assume that this is because a property on `lv` was None
raise TypeTransformerFailedError(f"Cannot convert literal {lv}")
raise TypeTransformerFailedError(f"Cannot convert literal {lv} to {self._type}")

def guess_python_type(self, literal_type: LiteralType) -> Type[T]:
if literal_type.simple is not None and literal_type.simple == self._lt.simple:
Expand Down Expand Up @@ -864,7 +866,13 @@ def literal_map_to_kwargs(
raise ValueError(
f"Received more input values {len(lm.literals)}" f" than allowed by the input spec {len(python_types)}"
)
return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
kwargs = {}
for i, k in enumerate(lm.literals):
try:
kwargs[k] = TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k])
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from exc
return kwargs

@classmethod
def dict_to_literal_map(
Expand Down
15 changes: 12 additions & 3 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from flytekit.core.python_auto_container import PythonAutoContainerTask
from flytekit.core.reference_entity import ReferenceEntity, WorkflowReference
from flytekit.core.tracker import extract_task_module
from flytekit.core.type_engine import TypeEngine
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.exceptions import scopes as exception_scopes
from flytekit.exceptions.user import FlyteValidationException, FlyteValueException
from flytekit.loggers import logger
Expand Down Expand Up @@ -260,7 +260,11 @@ def __call__(self, *args, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromis
input_kwargs = self.python_interface.default_inputs_as_kwargs
input_kwargs.update(kwargs)
self.compile()
return flyte_entity_call_handler(self, *args, **input_kwargs)
try:
return flyte_entity_call_handler(self, *args, **input_kwargs)
except Exception as exc:
exc.args = (f"Encountered error while executing workflow '{self.name}':\n {exc}", *exc.args[1:])
raise exc

def execute(self, **kwargs):
raise Exception("Should not be called")
Expand All @@ -274,7 +278,12 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
for k, v in kwargs.items():
if not isinstance(v, Promise):
t = self.python_interface.inputs[k]
kwargs[k] = Promise(var=k, val=TypeEngine.to_literal(ctx, v, t, self.interface.inputs[k].type))
try:
kwargs[k] = Promise(var=k, val=TypeEngine.to_literal(ctx, v, t, self.interface.inputs[k].type))
except TypeTransformerFailedError as exc:
raise TypeError(
f"Failed to convert input argument '{k}' of workflow '{self.name}':\n {exc}"
) from exc

# The output of this will always be a combination of Python native values and Promises containing Flyte
# Literals.
Expand Down
7 changes: 5 additions & 2 deletions flytekit/exceptions/scopes.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,13 @@ def user_entry_point(wrapped, instance, args, kwargs):
_CONTEXT_STACK.append(_USER_CONTEXT)
if _is_base_context():
# See comment at this location for system_entry_point
fn_name = wrapped.__name__
try:
return wrapped(*args, **kwargs)
except FlyteScopedException as ex:
raise ex.value
except FlyteScopedException as exc:
raise exc.type(f"Error encountered while executing '{fn_name}':\n {exc.value}") from exc
except Exception as exc:
raise type(exc)(f"Error encountered while executing '{fn_name}':\n {exc}") from exc
else:
try:
return wrapped(*args, **kwargs)
Expand Down
29 changes: 23 additions & 6 deletions flytekit/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os

from pythonjsonlogger import jsonlogger
from rich.console import Console
from rich.logging import RichHandler

# Note:
# The environment variable controls exposed to affect the individual loggers should be considered to be beta.
Expand All @@ -10,6 +12,7 @@
# For now, assume this is the environment variable whose usage will remain unchanged and controls output for all
# loggers defined in this file.
LOGGING_ENV_VAR = "FLYTE_SDK_LOGGING_LEVEL"
LOGGING_FMT_ENV_VAR = "FLYTE_SDK_LOGGING_FORMAT"

# By default, the root flytekit logger to debug so everything is logged, but enable fine-tuning
logger = logging.getLogger("flytekit")
Expand All @@ -33,8 +36,18 @@
user_space_logger = child_loggers["user_space"]

# create console handler
ch = logging.StreamHandler()

This comment has been minimized.

Copy link
@honnix

honnix May 16, 2023

Member

This is unfortunately a breaking change but I guess a minor version bump (1.6.0) is fine? We might want to highlight this in release notes.

This comment has been minimized.

Copy link
@honnix

honnix May 16, 2023

Member

I realize I should not comment on a commit. I will move this to the PR itself.

ch.setLevel(logging.DEBUG)
try:
handler = RichHandler(
rich_tracebacks=True,
omit_repeated_times=False,
keywords=["[flytekit]"],
log_time_format="%Y-%m-%d %H:%M:%S,%f",
console=Console(width=os.get_terminal_size().columns),
)
except OSError:
handler = logging.StreamHandler()

handler.setLevel(logging.DEBUG)

# Root logger control
# Don't want to import the configuration library since that will cause all sorts of circular imports, let's
Expand Down Expand Up @@ -63,10 +76,14 @@
child_logger.setLevel(logging.WARNING)

# create formatter
formatter = jsonlogger.JsonFormatter(fmt="%(asctime)s %(name)s %(levelname)s %(message)s")
logging_fmt = os.environ.get(LOGGING_FMT_ENV_VAR, "json")
if logging_fmt == "json":
formatter = jsonlogger.JsonFormatter(fmt="%(asctime)s %(name)s %(levelname)s %(message)s")
else:
formatter = logging.Formatter(fmt="[%(name)s] %(message)s")

# add formatter to ch
ch.setFormatter(formatter)
# add formatter to the handler
handler.setFormatter(formatter)

# add ch to logger
logger.addHandler(ch)
logger.addHandler(handler)
4 changes: 3 additions & 1 deletion flytekit/models/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc as _abc
import json as _json
import re

from flyteidl.admin import common_pb2 as _common_pb2
from google.protobuf import json_format as _json_format
Expand Down Expand Up @@ -57,7 +58,8 @@ def short_string(self):
"""
:rtype: Text
"""
return str(self.to_flyte_idl())
literal_str = re.sub(r"\s+", " ", str(self.to_flyte_idl())).strip()
return f"<FlyteLiteral {literal_str}>"

def verbose_string(self):
"""
Expand Down
17 changes: 15 additions & 2 deletions plugins/flytekit-pandera/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ def invalid_wf() -> pandera.typing.DataFrame[OutSchema]:
def wf_with_df_input(df: pandera.typing.DataFrame[InSchema]) -> pandera.typing.DataFrame[OutSchema]:
return transform2(df=transform1(df=df))

with pytest.raises(pandera.errors.SchemaError, match="^expected series 'col2' to have type float64, got object"):
with pytest.raises(
pandera.errors.SchemaError,
match=(
"^Encountered error while executing workflow 'test_plugin.wf_with_df_input':\n"
" expected series 'col2' to have type float64, got object"
),
):
wf_with_df_input(df=invalid_df)

# raise error when executing workflow with invalid output
Expand All @@ -67,7 +73,14 @@ def transform2_noop(df: pandera.typing.DataFrame[IntermediateSchema]) -> pandera
def wf_invalid_output(df: pandera.typing.DataFrame[InSchema]) -> pandera.typing.DataFrame[OutSchema]:
return transform2_noop(df=transform1(df=df))

with pytest.raises(TypeError, match="^Failed to convert return value"):
with pytest.raises(
TypeError,
match=(
"^Encountered error while executing workflow 'test_plugin.wf_invalid_output':\n"
" Error encountered while executing 'wf_invalid_output':\n"
" Failed to convert outputs of task"
),
):
wf_invalid_output(df=valid_df)


Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"numpy",
"gitpython",
"kubernetes>=12.0.1",
"rich",
],
extras_require=extras_require,
scripts=[
Expand Down
Loading

0 comments on commit e865db5

Please sign in to comment.