Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measuring the time taken for each component when executing a task #1581

Merged
merged 7 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck.deck import Deck
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
Expand Down Expand Up @@ -533,7 +534,8 @@ def dispatch_execute(
# a workflow or a subworkflow etc
logger.info(f"Invoking {self.name} with inputs: {native_inputs}")
try:
native_outputs = self.execute(**native_inputs)
with timeit("Execute user level code"):
native_outputs = self.execute(**native_inputs)
except Exception as e:
logger.exception(f"Exception when executing {e}")
raise e
Expand Down Expand Up @@ -570,21 +572,22 @@ def dispatch_execute(

# We manually construct a LiteralMap here because task inputs and outputs actually violate the assumption
# built into the IDL that all the values of a literal map are of the same type.
literals = {}
for i, (k, v) in enumerate(native_outputs_as_map.items()):
literal_type = self._outputs_interface[k].type
py_type = self.get_type_for_output_var(k, v)

if isinstance(v, tuple):
raise TypeError(f"Output({k}) in task '{self.name}' received a tuple {v}, instead of {py_type}")
try:
literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
except Exception as e:
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != f"o{i}" else i
msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}"
logger.error(msg)
raise TypeError(msg) from e
with timeit("Translate the output to literals"):
literals = {}
for i, (k, v) in enumerate(native_outputs_as_map.items()):
literal_type = self._outputs_interface[k].type
py_type = self.get_type_for_output_var(k, v)

if isinstance(v, tuple):
raise TypeError(f"Output({k}) in task '{self.name}' received a tuple {v}, instead of {py_type}")
try:
literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
except Exception as e:
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != f"o{i}" else i
msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}"
logger.error(msg)
raise TypeError(msg) from e

if self._disable_deck is False:
INPUT = "input"
Expand Down
16 changes: 15 additions & 1 deletion flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,20 @@ def default_deck(self) -> Deck:

return Deck("default")

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")

return time_line_deck

def __getattr__(self, attr_name: str) -> typing.Any:
"""
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc
Expand Down Expand Up @@ -725,7 +739,7 @@ class FlyteContextManager(object):
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context's within Flytekit is useful to manage compilation state and execution state. Refer to ``CompilationState``
and ``ExecutionState`` for for information. FlyteContextManager provides a singleton stack to manage these contexts.
and ``ExecutionState`` for more information. FlyteContextManager provides a singleton stack to manage these contexts.

Typical usage is

Expand Down
13 changes: 7 additions & 6 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from flytekit import configuration
from flytekit.configuration import DataConfig
from flytekit.core.utils import PerformanceTimer
from flytekit.core.utils import timeit
from flytekit.exceptions.user import FlyteAssertion
from flytekit.interfaces.random import random
from flytekit.loggers import logger
Expand Down Expand Up @@ -291,22 +291,23 @@ def upload_directory(self, local_path: str, remote_path: str):
"""
return self.put_data(local_path, remote_path, is_multipart=True)

@timeit("Download data to local from remote")
def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False):
"""
:param remote_path:
:param local_path:
:param is_multipart:
"""
try:
with PerformanceTimer(f"Copying ({remote_path} -> {local_path})"):
pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True)
self.get(remote_path, to_path=local_path, recursive=is_multipart)
pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True)
self.get(remote_path, to_path=local_path, recursive=is_multipart)
except Exception as ex:
raise FlyteAssertion(
f"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n"
f"Original exception: {str(ex)}"
)

@timeit("Upload data to remote")
def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False):
"""
The implication here is that we're always going to put data to the remote location, so we .remote to ensure
Expand All @@ -318,8 +319,8 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul
"""
try:
local_path = str(local_path)
with PerformanceTimer(f"Writing ({local_path} -> {remote_path})"):
self.put(cast(str, local_path), remote_path, recursive=is_multipart)

self.put(cast(str, local_path), remote_path, recursive=is_multipart)
except Exception as ex:
raise FlyteAssertion(
f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n"
Expand Down
2 changes: 2 additions & 0 deletions flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from flytekit.core.interface import transform_interface_to_list_interface
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.tracker import TrackedInstance
from flytekit.core.utils import timeit
from flytekit.exceptions import scopes as exception_scopes
from flytekit.models.array_job import ArrayJob
from flytekit.models.interface import Variable
Expand Down Expand Up @@ -356,6 +357,7 @@ def foo((i: int, j: str) -> str:
def name(self) -> str:
return "MapTaskResolver"

@timeit("Load map task")
def load_task(self, loader_args: List[str], max_concurrency: int = 0) -> MapPythonTask:
"""
Loader args should be of the form
Expand Down
3 changes: 2 additions & 1 deletion flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from flytekit.core.resources import Resources, ResourceSpec
from flytekit.core.tracked_abc import FlyteTrackedABC
from flytekit.core.tracker import TrackedInstance, extract_task_module
from flytekit.core.utils import _get_container_definition, _serialize_pod_spec
from flytekit.core.utils import _get_container_definition, _serialize_pod_spec, timeit
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec
from flytekit.loggers import logger
from flytekit.models import task as _task_model
Expand Down Expand Up @@ -227,6 +227,7 @@ class DefaultTaskResolver(TrackedInstance, TaskResolverMixin):
def name(self) -> str:
return "DefaultTaskResolver"

@timeit("Load task")
def load_task(self, loader_args: List[str]) -> PythonAutoContainerTask:
_, task_module, _, task_name, *_ = loader_args

Expand Down
4 changes: 3 additions & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from flytekit.core.context_manager import FlyteContext
from flytekit.core.hash import HashMethod
from flytekit.core.type_helpers import load_type_from_tag
from flytekit.core.utils import timeit
from flytekit.exceptions import user as user_exceptions
from flytekit.loggers import logger
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -833,7 +834,7 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T
return transformer.to_python_value(ctx, lv, expected_python_type)

@classmethod
def to_html(cls, ctx: FlyteContext, python_val: typing.Any, expected_python_type: Type[T]) -> str:
def to_html(cls, ctx: FlyteContext, python_val: typing.Any, expected_python_type: Type[typing.Any]) -> str:
transformer = cls.get_transformer(expected_python_type)
if get_origin(expected_python_type) is Annotated:
expected_python_type, *annotate_args = get_args(expected_python_type)
Expand All @@ -856,6 +857,7 @@ def named_tuple_to_variable_map(cls, t: typing.NamedTuple) -> _interface_models.
return _interface_models.VariableMap(variables=variables)

@classmethod
@timeit("Translate literal to python value")
def literal_map_to_kwargs(
cls, ctx: FlyteContext, lm: LiteralMap, python_types: typing.Dict[str, type]
) -> typing.Dict[str, typing.Any]:
Expand Down
58 changes: 50 additions & 8 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import os as _os
import shutil as _shutil
import tempfile as _tempfile
import time as _time
from functools import wraps
from hashlib import sha224 as _sha224
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
from typing import Any, Callable, Dict, List, Optional, cast

from flyteidl.core import tasks_pb2 as _core_task
from kubernetes.client import ApiClient
Expand Down Expand Up @@ -259,26 +261,66 @@ def __str__(self):
return self.__repr__()


class PerformanceTimer(object):
def __init__(self, context_statement):
class timeit:
"""
A context manager and a decorator that measures the execution time of the wrapped code block or functions.
It will append a timing information to TimeLineDeck. For instance:

@timeit("Function description")
def function()

with timeit("Wrapped code block description"):
# your code
"""

def __init__(self, name: str = ""):
"""
:param Text context_statement: the statement to log
:param name: A string that describes the wrapped code block or function being executed.
"""
self._context_statement = context_statement
self._name = name
self.start_time = None
self._start_wall_time = None
self._start_process_time = None

def __call__(self, func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
with self:
return func(*args, **kwargs)

return wrapper

def __enter__(self):
logger.info("Entering timed context: {}".format(self._context_statement))
self.start_time = datetime.datetime.utcnow()
self._start_wall_time = _time.perf_counter()
self._start_process_time = _time.process_time()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""
The exception, if any, will propagate outside the context manager, as the purpose of this context manager
is solely to measure the execution time of the wrapped code block.
"""
from flytekit.core.context_manager import FlyteContextManager

end_time = datetime.datetime.utcnow()
end_wall_time = _time.perf_counter()
end_process_time = _time.process_time()

timeline_deck = FlyteContextManager.current_context().user_space_params.timeline_deck
timeline_deck.append_time_info(
dict(
Name=self._name,
Start=self.start_time,
Finish=end_time,
WallTime=end_wall_time - self._start_wall_time,
ProcessTime=end_process_time - self._start_process_time,
)
)

logger.info(
"Exiting timed context: {} [Wall Time: {}s, Process Time: {}s]".format(
self._context_statement,
"{}. [Wall Time: {}s, Process Time: {}s]".format(
self._name,
end_wall_time - self._start_wall_time,
end_process_time - self._start_process_time,
)
Expand Down
53 changes: 53 additions & 0 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
OUTPUT_DIR_JUPYTER_PREFIX = "jupyter"
DECK_FILE_NAME = "deck.html"


try:
from IPython.core.display import HTML
except ImportError:
Expand Down Expand Up @@ -79,6 +80,58 @@ def html(self) -> str:
return self._html


class TimeLineDeck(Deck):
"""
The TimeLineDeck class is designed to render the execution time of each part of a task.
Unlike deck class, the conversion of data to HTML is delayed until the html property is accessed.
This approach is taken because rendering a timeline graph with partial data would not provide meaningful insights.
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
self.time_info = []

def append_time_info(self, info: dict):
assert isinstance(info, dict)
self.time_info.append(info)

@property
def html(self) -> str:
try:
from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer
except ImportError:
warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image."
logger.warning(warning_info)
return warning_info

if len(self.time_info) == 0:
return ""

import pandas

df = pandas.DataFrame(self.time_info)
note = """
<p><strong>Note:</strong></p>
<ol>
<li>if the time duration is too small(< 1ms), it may be difficult to see on the time line graph.</li>
<li>For accurate execution time measurements, users should refer to wall time and process time.</li>
</ol>
"""
# set the accuracy to microsecond
df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time))
df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time))

width = 1400
gantt_chart_html = GanttChartRenderer().to_html(df, chart_width=width)
time_table_html = TableRenderer().to_html(
df[["Name", "WallTime", "ProcessTime"]],
header_labels=["Name", "Wall Time(s)", "Process Time(s)"],
table_width=width,
)
return gantt_chart_html + time_table_html + note


def _ipython_check() -> bool:
"""
Check if interface is launching from iPython (not colab)
Expand Down
8 changes: 5 additions & 3 deletions flytekit/deck/html/template.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@
}

#flyte-frame-container {
width: 100%;
width: auto;
}

#flyte-frame-container > div {
display: none;
display: None;
}

#flyte-frame-container > div.active {
display: block;
display: Block;
padding: 2rem 4rem;
width: 100%;
}

</style>

</head>
Expand Down
2 changes: 2 additions & 0 deletions flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import click

from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.utils import timeit
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore
from flytekit.tools.script_mode import tar_strip_file_attributes

Expand Down Expand Up @@ -97,6 +98,7 @@ def get_additional_distribution_loc(remote_location: str, identifier: str) -> st
return posixpath.join(remote_location, "{}.{}".format(identifier, "tar.gz"))


@timeit("Download distribution")
def download_distribution(additional_distribution: str, destination: str):
"""
Downloads a remote code distribution and overwrites any local files.
Expand Down
Loading