From b0f3791288808437ac2cdc1d3aebed46e43afb78 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 1 Dec 2022 11:36:06 -0800 Subject: [PATCH 01/10] databricks plugin Signed-off-by: Kevin Su --- .../flytekitplugins/spark/models.py | 21 +++++++++++++--- .../flytekitplugins/spark/task.py | 24 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/models.py b/plugins/flytekit-spark/flytekitplugins/spark/models.py index 53b1620331..e43d6c74f4 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/models.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/models.py @@ -1,5 +1,5 @@ import enum -import typing +from typing import Dict, Optional from flyteidl.plugins import spark_pb2 as _spark_task @@ -22,6 +22,7 @@ def __init__( main_class, spark_conf, hadoop_conf, + databricks_conf, executor_path, ): """ @@ -37,9 +38,13 @@ def __init__( self._executor_path = executor_path self._spark_conf = spark_conf self._hadoop_conf = hadoop_conf + self._databricks_conf = databricks_conf def with_overrides( - self, new_spark_conf: typing.Dict[str, str] = None, new_hadoop_conf: typing.Dict[str, str] = None + self, + new_spark_conf: Optional[Dict[str, str]] = None, + new_hadoop_conf: Optional[Dict[str, str]] = None, + new_databricks_conf: Optional[str] = None, ) -> "SparkJob": if not new_spark_conf: new_spark_conf = self.spark_conf @@ -47,12 +52,16 @@ def with_overrides( if not new_hadoop_conf: new_hadoop_conf = self.hadoop_conf + if not new_databricks_conf: + new_databricks_conf = self.databricks_conf + return SparkJob( spark_type=self.spark_type, application_file=self.application_file, main_class=self.main_class, spark_conf=new_spark_conf, hadoop_conf=new_hadoop_conf, + databricks_conf=new_databricks_conf, executor_path=self.executor_path, ) @@ -104,6 +113,10 @@ def hadoop_conf(self): """ return self._hadoop_conf + @property + def databricks_conf(self) -> str: + return self._databricks_conf + def to_flyte_idl(self): """ :rtype: flyteidl.plugins.spark_pb2.SparkJob @@ -127,6 +140,7 @@ def to_flyte_idl(self): executorPath=self.executor_path, sparkConf=self.spark_conf, hadoopConf=self.hadoop_conf, + databricksConf=self.databricks_conf, ) @classmethod @@ -145,10 +159,11 @@ def from_flyte_idl(cls, pb2_object): application_type = SparkType.R return cls( - type=application_type, + spark_type=application_type, spark_conf=pb2_object.sparkConf, application_file=pb2_object.mainApplicationFile, main_class=pb2_object.mainClass, hadoop_conf=pb2_object.hadoopConf, executor_path=pb2_object.executorPath, + databricks_conf=pb2_object.databricksConf, ) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index 8428b492ce..c83720b676 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -1,3 +1,5 @@ +import base64 +import json import os import typing from dataclasses import dataclass @@ -13,6 +15,23 @@ from .models import SparkJob, SparkType +_ = { + "name": "flytekit databricks plugin example", + "new_cluster": { + "spark_version": "11.0.x-scala2.12", + "node_type_id": "r3.xlarge", + "aws_attributes": {"availability": "ON_DEMAND"}, + "num_workers": 4, + "docker_image": {"url": "pingsutw/databricks:latest"}, + }, + "timeout_seconds": 3600, + "max_retries": 1, + "spark_python_task": { + "python_file": "dbfs:///FileStore/tables/entrypoint-1.py", + "parameters": "ls", + }, +} + @dataclass class Spark(object): @@ -27,6 +46,7 @@ class Spark(object): spark_conf: Optional[Dict[str, str]] = None hadoop_conf: Optional[Dict[str, str]] = None + databricks_conf: typing.Optional[dict] = None def __post_init__(self): if self.spark_conf is None: @@ -35,6 +55,9 @@ def __post_init__(self): if self.hadoop_conf is None: self.hadoop_conf = {} + if self.databricks_conf is None: + self.databricks_conf = {} + # This method does not reset the SparkSession since it's a bit hard to handle multiple # Spark sessions in a single application as it's described in: @@ -95,6 +118,7 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: job = SparkJob( spark_conf=self.task_config.spark_conf, hadoop_conf=self.task_config.hadoop_conf, + databricks_conf=base64.b64encode(json.dumps(self.task_config.databricks_conf).encode()).decode(), application_file="local://" + settings.entrypoint_settings.path, executor_path=settings.python_interpreter, main_class="", From 8bc4db01e8bc7072e16a1d2622e79afd9cc7cd3b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 1 Dec 2022 17:41:40 -0800 Subject: [PATCH 02/10] update test Signed-off-by: Kevin Su --- .../flytekitplugins/spark/models.py | 2 +- .../flytekitplugins/spark/task.py | 17 ---------------- .../flytekit-spark/tests/test_spark_task.py | 20 ++++++++++++++++++- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/models.py b/plugins/flytekit-spark/flytekitplugins/spark/models.py index e43d6c74f4..d012dc3a66 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/models.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/models.py @@ -159,7 +159,7 @@ def from_flyte_idl(cls, pb2_object): application_type = SparkType.R return cls( - spark_type=application_type, + type=application_type, spark_conf=pb2_object.sparkConf, application_file=pb2_object.mainApplicationFile, main_class=pb2_object.mainClass, diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index c83720b676..05e43a6202 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -15,23 +15,6 @@ from .models import SparkJob, SparkType -_ = { - "name": "flytekit databricks plugin example", - "new_cluster": { - "spark_version": "11.0.x-scala2.12", - "node_type_id": "r3.xlarge", - "aws_attributes": {"availability": "ON_DEMAND"}, - "num_workers": 4, - "docker_image": {"url": "pingsutw/databricks:latest"}, - }, - "timeout_seconds": 3600, - "max_retries": 1, - "spark_python_task": { - "python_file": "dbfs:///FileStore/tables/entrypoint-1.py", - "parameters": "ls", - }, -} - @dataclass class Spark(object): diff --git a/plugins/flytekit-spark/tests/test_spark_task.py b/plugins/flytekit-spark/tests/test_spark_task.py index 38555fa9b8..18f4349931 100644 --- a/plugins/flytekit-spark/tests/test_spark_task.py +++ b/plugins/flytekit-spark/tests/test_spark_task.py @@ -19,7 +19,24 @@ def reset_spark_session() -> None: def test_spark_task(reset_spark_session): - @task(task_config=Spark(spark_conf={"spark": "1"})) + databricks_conf = { + "name": "flytekit databricks plugin example", + "new_cluster": { + "spark_version": "11.0.x-scala2.12", + "node_type_id": "r3.xlarge", + "aws_attributes": {"availability": "ON_DEMAND"}, + "num_workers": 4, + "docker_image": {"url": "pingsutw/databricks:latest"}, + }, + "timeout_seconds": 3600, + "max_retries": 1, + "spark_python_task": { + "python_file": "dbfs:///FileStore/tables/entrypoint-1.py", + "parameters": "ls", + }, + } + + @task(task_config=Spark(spark_conf={"spark": "1"}, databricks_conf=databricks_conf)) def my_spark(a: str) -> int: session = flytekit.current_context().spark_session assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local" @@ -27,6 +44,7 @@ def my_spark(a: str) -> int: assert my_spark.task_config is not None assert my_spark.task_config.spark_conf == {"spark": "1"} + assert my_spark.task_config.databricks_conf == databricks_conf default_img = Image(name="default", fqn="test", tag="tag") settings = SerializationSettings( From 9364e40fa3d12b3a4af48f179b82e50a3ad1506b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 6 Dec 2022 16:55:12 -0800 Subject: [PATCH 03/10] nit Signed-off-by: Kevin Su --- plugins/flytekit-spark/flytekitplugins/spark/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index 05e43a6202..0679e9e88f 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -29,7 +29,7 @@ class Spark(object): spark_conf: Optional[Dict[str, str]] = None hadoop_conf: Optional[Dict[str, str]] = None - databricks_conf: typing.Optional[dict] = None + databricks_conf: typing.Optional[Dict[str, typing.Union[str, dict]]] = None def __post_init__(self): if self.spark_conf is None: From fa4a54f7a3ddcaff9fee5ef67e3081c4dc34b171 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 9 Dec 2022 00:41:01 -0800 Subject: [PATCH 04/10] test Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 4 ++++ flytekit/core/python_auto_container.py | 2 ++ plugins/flytekit-spark/flytekitplugins/spark/models.py | 1 + 3 files changed, 7 insertions(+) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index fc8a554cdd..b705b8e690 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -46,6 +46,7 @@ jobs: - name: Install dependencies run: | make setup${{ matrix.spark-version-suffix }} + pip install "git+https://github.com/flyteorg/flyteidl@databricks" pip freeze - name: Test with coverage run: | @@ -131,6 +132,7 @@ jobs: pip install -r requirements.txt if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi pip install --no-deps -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit + pip install "git+https://github.com/flyteorg/flyteidl@databricks" pip freeze - name: Test with coverage run: | @@ -155,6 +157,7 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 pip install -r dev-requirements.txt + pip install "git+https://github.com/flyteorg/flyteidl@databricks" - name: Lint run: | make lint @@ -176,5 +179,6 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 setuptools wheel pip install -r doc-requirements.txt + pip install "git+https://github.com/flyteorg/flyteidl@databricks" - name: Build the documentation run: make -C docs html diff --git a/flytekit/core/python_auto_container.py b/flytekit/core/python_auto_container.py index 06133d9784..e7d0014de1 100644 --- a/flytekit/core/python_auto_container.py +++ b/flytekit/core/python_auto_container.py @@ -2,6 +2,7 @@ import importlib import re +import sys from abc import ABC from types import ModuleType from typing import Callable, Dict, List, Optional, TypeVar, Union @@ -191,6 +192,7 @@ def name(self) -> str: def load_task(self, loader_args: List[Union[T, ModuleType]]) -> PythonAutoContainerTask: _, task_module, _, task_name, *_ = loader_args + sys.path.append(".") task_module = importlib.import_module(task_module) task_def = getattr(task_module, task_name) return task_def diff --git a/plugins/flytekit-spark/flytekitplugins/spark/models.py b/plugins/flytekit-spark/flytekitplugins/spark/models.py index d012dc3a66..5ddda6b939 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/models.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/models.py @@ -31,6 +31,7 @@ def __init__( :param application_file: The main application file to execute. :param dict[Text, Text] spark_conf: A definition of key-value pairs for spark config for the job. :param dict[Text, Text] hadoop_conf: A definition of key-value pairs for hadoop config for the job. + :param dict[Text, dict] databricks_conf: A definition of key-value pairs for databricks config for the job. Refer to https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit. """ self._application_file = application_file self._spark_type = spark_type From 25b7d9f5645480ceed6234af63f0553ef4066452 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 9 Dec 2022 01:21:28 -0800 Subject: [PATCH 05/10] nit Signed-off-by: Kevin Su --- plugins/flytekit-spark/flytekitplugins/spark/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index 0679e9e88f..a394766931 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -114,7 +114,7 @@ def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: ctx = FlyteContextManager.current_context() sess_builder = _pyspark.sql.SparkSession.builder.appName(f"FlyteSpark: {user_params.execution_id}") - if not (ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION): + if self.task_config.spark_conf and not (ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION): # If either of above cases is not true, then we are in local execution of this task # Add system spark-conf for local/notebook based execution. spark_conf = _pyspark.SparkConf() From 0348cd52c1ba695771f40176088e2e6853c94e5a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 15 Dec 2022 10:23:59 +0800 Subject: [PATCH 06/10] Update databricks branch (#1370) * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * add instance name Signed-off-by: Kevin Su * nit Signed-off-by: Kevin Su Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 16 +++---- flytekit/core/python_auto_container.py | 1 - .../flytekitplugins/spark/models.py | 46 +++++++++++++++++-- .../flytekitplugins/spark/task.py | 11 ++++- 4 files changed, 59 insertions(+), 15 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 1f8dd78ef0..10e7224e47 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -445,14 +445,14 @@ def _pass_through(): def execute_task_cmd( inputs, output_prefix, - raw_output_data_prefix, - test, - prev_checkpoint, - checkpoint_path, - dynamic_addl_distro, - dynamic_dest_dir, - resolver, - resolver_args, + raw_output_data_prefix=None, + test=False, + prev_checkpoint=None, + checkpoint_path=None, + dynamic_addl_distro=None, + dynamic_dest_dir=None, + resolver=None, + resolver_args=None, ): logger.info(get_version_message()) # We get weird errors if there are no click echo messages at all, so emit an empty string so that unit tests pass. diff --git a/flytekit/core/python_auto_container.py b/flytekit/core/python_auto_container.py index e7d0014de1..d481982eee 100644 --- a/flytekit/core/python_auto_container.py +++ b/flytekit/core/python_auto_container.py @@ -192,7 +192,6 @@ def name(self) -> str: def load_task(self, loader_args: List[Union[T, ModuleType]]) -> PythonAutoContainerTask: _, task_module, _, task_name, *_ = loader_args - sys.path.append(".") task_module = importlib.import_module(task_module) task_def = getattr(task_module, task_name) return task_def diff --git a/plugins/flytekit-spark/flytekitplugins/spark/models.py b/plugins/flytekit-spark/flytekitplugins/spark/models.py index 5ddda6b939..73fa239e5b 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/models.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/models.py @@ -2,6 +2,8 @@ from typing import Dict, Optional from flyteidl.plugins import spark_pb2 as _spark_task +from google.protobuf import json_format +from google.protobuf.struct_pb2 import Struct from flytekit.exceptions import user as _user_exceptions from flytekit.models import common as _common @@ -23,6 +25,8 @@ def __init__( spark_conf, hadoop_conf, databricks_conf, + databricks_token, + databricks_instance, executor_path, ): """ @@ -32,6 +36,8 @@ def __init__( :param dict[Text, Text] spark_conf: A definition of key-value pairs for spark config for the job. :param dict[Text, Text] hadoop_conf: A definition of key-value pairs for hadoop config for the job. :param dict[Text, dict] databricks_conf: A definition of key-value pairs for databricks config for the job. Refer to https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit. + :param str databricks_token: databricks access token. + :param str databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. """ self._application_file = application_file self._spark_type = spark_type @@ -40,12 +46,14 @@ def __init__( self._spark_conf = spark_conf self._hadoop_conf = hadoop_conf self._databricks_conf = databricks_conf + self._databricks_token = databricks_token + self._databricks_instance = databricks_instance def with_overrides( self, new_spark_conf: Optional[Dict[str, str]] = None, new_hadoop_conf: Optional[Dict[str, str]] = None, - new_databricks_conf: Optional[str] = None, + new_databricks_conf: Optional[Dict[str, Dict]] = None, ) -> "SparkJob": if not new_spark_conf: new_spark_conf = self.spark_conf @@ -63,6 +71,8 @@ def with_overrides( spark_conf=new_spark_conf, hadoop_conf=new_hadoop_conf, databricks_conf=new_databricks_conf, + databricks_token=self.databricks_token, + databricks_instance=self.databricks_instance, executor_path=self.executor_path, ) @@ -115,9 +125,30 @@ def hadoop_conf(self): return self._hadoop_conf @property - def databricks_conf(self) -> str: + def databricks_conf(self) -> Dict[str, Dict]: + """ + databricks_conf: Databricks job configuration. + Config structure can be found here. https://docs.databricks.com/dev-tools/api/2.0/jobs.html#request-structure + :rtype: dict[Text, dict[Text, Text]] + """ return self._databricks_conf + @property + def databricks_token(self) -> str: + """ + Databricks access token + :rtype: str + """ + return self._databricks_token + + @property + def databricks_instance(self) -> str: + """ + Domain name of your deployment. Use the form .cloud.databricks.com. + :rtype: str + """ + return self._databricks_instance + def to_flyte_idl(self): """ :rtype: flyteidl.plugins.spark_pb2.SparkJob @@ -134,6 +165,9 @@ def to_flyte_idl(self): else: raise _user_exceptions.FlyteValidationException("Invalid Spark Application Type Specified") + databricks_conf = Struct() + databricks_conf.update(self.databricks_conf) + return _spark_task.SparkJob( applicationType=application_type, mainApplicationFile=self.application_file, @@ -141,7 +175,9 @@ def to_flyte_idl(self): executorPath=self.executor_path, sparkConf=self.spark_conf, hadoopConf=self.hadoop_conf, - databricksConf=self.databricks_conf, + databricksConf=databricks_conf, + databricksToken=self.databricks_token, + databricksInstance=self.databricks_instance, ) @classmethod @@ -166,5 +202,7 @@ def from_flyte_idl(cls, pb2_object): main_class=pb2_object.mainClass, hadoop_conf=pb2_object.hadoopConf, executor_path=pb2_object.executorPath, - databricks_conf=pb2_object.databricksConf, + databricks_conf=json_format.MessageToDict(pb2_object.databricksConf), + databricks_token=pb2_object.databricksToken, + databricks_instance=pb2_object.databricksInstance, ) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index a394766931..8d795be8ae 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -25,11 +25,16 @@ class Spark(object): Args: spark_conf: Dictionary of spark config. The variables should match what spark expects hadoop_conf: Dictionary of hadoop conf. The variables should match a typical hadoop configuration for spark + databricks_conf: Databricks job configuration. Config structure can be found here. https://docs.databricks.com/dev-tools/api/2.0/jobs.html#request-structure + databricks_token: Databricks access token. https://docs.databricks.com/dev-tools/api/latest/authentication.html. + databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. """ spark_conf: Optional[Dict[str, str]] = None hadoop_conf: Optional[Dict[str, str]] = None databricks_conf: typing.Optional[Dict[str, typing.Union[str, dict]]] = None + databricks_token: Optional[str] = None + databricks_instance: Optional[str] = None def __post_init__(self): if self.spark_conf is None: @@ -101,7 +106,9 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: job = SparkJob( spark_conf=self.task_config.spark_conf, hadoop_conf=self.task_config.hadoop_conf, - databricks_conf=base64.b64encode(json.dumps(self.task_config.databricks_conf).encode()).decode(), + databricks_conf=self.task_config.databricks_conf, + databricks_token=self.task_config.databricks_token, + databricks_instance=self.task_config.databricks_instance, application_file="local://" + settings.entrypoint_settings.path, executor_path=settings.python_interpreter, main_class="", @@ -114,7 +121,7 @@ def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: ctx = FlyteContextManager.current_context() sess_builder = _pyspark.sql.SparkSession.builder.appName(f"FlyteSpark: {user_params.execution_id}") - if self.task_config.spark_conf and not (ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION): + if not (ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION): # If either of above cases is not true, then we are in local execution of this task # Add system spark-conf for local/notebook based execution. spark_conf = _pyspark.SparkConf() From bc3dd06f55d504c532c9100e5cb6d2d348fbe01c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 16 Dec 2022 12:26:10 -0800 Subject: [PATCH 07/10] Address comment Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 16 ++++++++-------- flytekit/core/python_auto_container.py | 1 - 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 10e7224e47..1f8dd78ef0 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -445,14 +445,14 @@ def _pass_through(): def execute_task_cmd( inputs, output_prefix, - raw_output_data_prefix=None, - test=False, - prev_checkpoint=None, - checkpoint_path=None, - dynamic_addl_distro=None, - dynamic_dest_dir=None, - resolver=None, - resolver_args=None, + raw_output_data_prefix, + test, + prev_checkpoint, + checkpoint_path, + dynamic_addl_distro, + dynamic_dest_dir, + resolver, + resolver_args, ): logger.info(get_version_message()) # We get weird errors if there are no click echo messages at all, so emit an empty string so that unit tests pass. diff --git a/flytekit/core/python_auto_container.py b/flytekit/core/python_auto_container.py index d481982eee..06133d9784 100644 --- a/flytekit/core/python_auto_container.py +++ b/flytekit/core/python_auto_container.py @@ -2,7 +2,6 @@ import importlib import re -import sys from abc import ABC from types import ModuleType from typing import Callable, Dict, List, Optional, TypeVar, Union From ea5f2be7bd3f7a5880d0aa66344cf68c6301b617 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 16 Dec 2022 18:57:19 -0800 Subject: [PATCH 08/10] nit Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 7 +------ dev-requirements.txt | 2 +- doc-requirements.txt | 2 +- plugins/flytekit-spark/flytekitplugins/spark/task.py | 2 -- requirements-spark2.txt | 2 +- requirements.txt | 2 +- 6 files changed, 5 insertions(+), 12 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 9a3a6340d0..4c96e9f011 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -46,7 +46,6 @@ jobs: - name: Install dependencies run: | make setup${{ matrix.spark-version-suffix }} - pip install "git+https://github.com/flyteorg/flyteidl@databricks" pip freeze - name: Test with coverage run: | @@ -133,9 +132,7 @@ jobs: cd plugins/${{ matrix.plugin-names }} pip install -r requirements.txt if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi - pip install --no-deps -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit - pip install "git+https://github.com/flyteorg/flyteidl@databricks" - # pip install -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit + pip install -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit pip freeze - name: Test with coverage run: | @@ -166,7 +163,6 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 pip install -r dev-requirements.txt - pip install "git+https://github.com/flyteorg/flyteidl@databricks" - name: Lint run: | make lint @@ -188,7 +184,6 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 setuptools wheel pip install -r doc-requirements.txt - pip install "git+https://github.com/flyteorg/flyteidl@databricks" - name: Build the documentation run: | # TODO: Remove after buf migration is done and packages updated diff --git a/dev-requirements.txt b/dev-requirements.txt index a9de992c14..14510b09cb 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -130,7 +130,7 @@ filelock==3.8.2 # via virtualenv flatbuffers==22.12.6 # via tensorflow -flyteidl==1.3.0 +flyteidl==1.3.1 # via # -c requirements.txt # flytekit diff --git a/doc-requirements.txt b/doc-requirements.txt index be5d737bfd..7c92fcb018 100644 --- a/doc-requirements.txt +++ b/doc-requirements.txt @@ -182,7 +182,7 @@ filelock==3.8.2 # virtualenv flatbuffers==22.12.6 # via tensorflow -flyteidl==1.3.0 +flyteidl==1.3.1 # via flytekit fonttools==4.38.0 # via matplotlib diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index 8d795be8ae..f1b3cdff6a 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -1,5 +1,3 @@ -import base64 -import json import os import typing from dataclasses import dataclass diff --git a/requirements-spark2.txt b/requirements-spark2.txt index 3c07cb3cee..c6d0ff7fc0 100644 --- a/requirements-spark2.txt +++ b/requirements-spark2.txt @@ -52,7 +52,7 @@ docker-image-py==0.1.12 # via flytekit docstring-parser==0.15 # via flytekit -flyteidl==1.3.0 +flyteidl==1.3.1 # via flytekit googleapis-common-protos==1.57.0 # via diff --git a/requirements.txt b/requirements.txt index caff0db497..5623078a25 100644 --- a/requirements.txt +++ b/requirements.txt @@ -50,7 +50,7 @@ docker-image-py==0.1.12 # via flytekit docstring-parser==0.15 # via flytekit -flyteidl==1.3.0 +flyteidl==1.3.1 # via flytekit googleapis-common-protos==1.57.0 # via From e0aee0aa4251e9a3c6051da06670527ac62bd95a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 16 Dec 2022 19:16:02 -0800 Subject: [PATCH 09/10] updateidl Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 2 +- plugins/flytekit-spark/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 4c96e9f011..4c16153be9 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -80,7 +80,7 @@ jobs: - flytekit-modin - flytekit-onnx-pytorch - flytekit-onnx-scikitlearn - # onnxx-tensorflow needs a version of tensorflow that does not work with protobuf>4. + # onnx-tensorflow needs a version of tensorflow that does not work with protobuf>4. # The issue is being tracked on the tensorflow side in https://github.com/tensorflow/tensorflow/issues/53234#issuecomment-1330111693 # flytekit-onnx-tensorflow - flytekit-pandera diff --git a/plugins/flytekit-spark/requirements.txt b/plugins/flytekit-spark/requirements.txt index a8df06cc24..b7087441ef 100644 --- a/plugins/flytekit-spark/requirements.txt +++ b/plugins/flytekit-spark/requirements.txt @@ -46,7 +46,7 @@ docker-image-py==0.1.12 # via flytekit docstring-parser==0.15 # via flytekit -flyteidl==1.3.0 +flyteidl==1.3.1 # via flytekit flytekit==1.3.0b2 # via flytekitplugins-spark From c05d01d65d29ea014cfaaf6704118b9a9632285c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 19 Dec 2022 14:02:35 -0800 Subject: [PATCH 10/10] Update databricks config Signed-off-by: Kevin Su --- .../flytekitplugins/spark/models.py | 24 +++++++------- .../flytekitplugins/spark/task.py | 33 ++++++++++++------- .../flytekit-spark/tests/test_spark_task.py | 27 +++++++++++++-- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/plugins/flytekit-spark/flytekitplugins/spark/models.py b/plugins/flytekit-spark/flytekitplugins/spark/models.py index 73fa239e5b..28e67ac631 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/models.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/models.py @@ -19,15 +19,15 @@ class SparkType(enum.Enum): class SparkJob(_common.FlyteIdlEntity): def __init__( self, - spark_type, - application_file, - main_class, - spark_conf, - hadoop_conf, - databricks_conf, - databricks_token, - databricks_instance, - executor_path, + spark_type: SparkType, + application_file: str, + main_class: str, + spark_conf: Dict[str, str], + hadoop_conf: Dict[str, str], + executor_path: str, + databricks_conf: Dict[str, Dict[str, Dict]] = {}, + databricks_token: Optional[str] = None, + databricks_instance: Optional[str] = None, ): """ This defines a SparkJob target. It will execute the appropriate SparkJob. @@ -35,9 +35,9 @@ def __init__( :param application_file: The main application file to execute. :param dict[Text, Text] spark_conf: A definition of key-value pairs for spark config for the job. :param dict[Text, Text] hadoop_conf: A definition of key-value pairs for hadoop config for the job. - :param dict[Text, dict] databricks_conf: A definition of key-value pairs for databricks config for the job. Refer to https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit. - :param str databricks_token: databricks access token. - :param str databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. + :param Optional[dict[Text, dict]] databricks_conf: A definition of key-value pairs for databricks config for the job. Refer to https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit. + :param Optional[str] databricks_token: databricks access token. + :param Optional[str] databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. """ self._application_file = application_file self._spark_type = spark_type diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index f1b3cdff6a..180a28bb87 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -23,16 +23,10 @@ class Spark(object): Args: spark_conf: Dictionary of spark config. The variables should match what spark expects hadoop_conf: Dictionary of hadoop conf. The variables should match a typical hadoop configuration for spark - databricks_conf: Databricks job configuration. Config structure can be found here. https://docs.databricks.com/dev-tools/api/2.0/jobs.html#request-structure - databricks_token: Databricks access token. https://docs.databricks.com/dev-tools/api/latest/authentication.html. - databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. """ spark_conf: Optional[Dict[str, str]] = None hadoop_conf: Optional[Dict[str, str]] = None - databricks_conf: typing.Optional[Dict[str, typing.Union[str, dict]]] = None - databricks_token: Optional[str] = None - databricks_instance: Optional[str] = None def __post_init__(self): if self.spark_conf is None: @@ -41,8 +35,22 @@ def __post_init__(self): if self.hadoop_conf is None: self.hadoop_conf = {} - if self.databricks_conf is None: - self.databricks_conf = {} + +@dataclass +class Databricks(Spark): + """ + Use this to configure a Databricks task. Task's marked with this will automatically execute + natively onto databricks platform as a distributed execution of spark + + Args: + databricks_conf: Databricks job configuration. Config structure can be found here. https://docs.databricks.com/dev-tools/api/2.0/jobs.html#request-structure + databricks_token: Databricks access token. https://docs.databricks.com/dev-tools/api/latest/authentication.html. + databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. + """ + + databricks_conf: typing.Optional[Dict[str, typing.Union[str, dict]]] = None + databricks_token: Optional[str] = None + databricks_instance: Optional[str] = None # This method does not reset the SparkSession since it's a bit hard to handle multiple @@ -104,14 +112,17 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: job = SparkJob( spark_conf=self.task_config.spark_conf, hadoop_conf=self.task_config.hadoop_conf, - databricks_conf=self.task_config.databricks_conf, - databricks_token=self.task_config.databricks_token, - databricks_instance=self.task_config.databricks_instance, application_file="local://" + settings.entrypoint_settings.path, executor_path=settings.python_interpreter, main_class="", spark_type=SparkType.PYTHON, ) + if isinstance(self.task_config, Databricks): + cfg = typing.cast(self.task_config, Databricks) + job._databricks_conf = cfg.databricks_conf + job._databricks_token = cfg.databricks_token + job._databricks_instance = cfg.databricks_instance + return MessageToDict(job.to_flyte_idl()) def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: diff --git a/plugins/flytekit-spark/tests/test_spark_task.py b/plugins/flytekit-spark/tests/test_spark_task.py index 18f4349931..7049684a2d 100644 --- a/plugins/flytekit-spark/tests/test_spark_task.py +++ b/plugins/flytekit-spark/tests/test_spark_task.py @@ -2,7 +2,7 @@ import pyspark import pytest from flytekitplugins.spark import Spark -from flytekitplugins.spark.task import new_spark_session +from flytekitplugins.spark.task import Databricks, new_spark_session from pyspark.sql import SparkSession import flytekit @@ -36,7 +36,7 @@ def test_spark_task(reset_spark_session): }, } - @task(task_config=Spark(spark_conf={"spark": "1"}, databricks_conf=databricks_conf)) + @task(task_config=Spark(spark_conf={"spark": "1"})) def my_spark(a: str) -> int: session = flytekit.current_context().spark_session assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local" @@ -44,7 +44,6 @@ def my_spark(a: str) -> int: assert my_spark.task_config is not None assert my_spark.task_config.spark_conf == {"spark": "1"} - assert my_spark.task_config.databricks_conf == databricks_conf default_img = Image(name="default", fqn="test", tag="tag") settings = SerializationSettings( @@ -71,6 +70,28 @@ def my_spark(a: str) -> int: assert ("spark", "1") in configs assert ("spark.app.name", "FlyteSpark: ex:local:local:local") in configs + databricks_token = "token" + databricks_instance = "account.cloud.databricks.com" + + @task( + task_config=Databricks( + spark_conf={"spark": "2"}, + databricks_conf=databricks_conf, + databricks_instance="account.cloud.databricks.com", + databricks_token="token", + ) + ) + def my_databricks(a: str) -> int: + session = flytekit.current_context().spark_session + assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local" + return 10 + + assert my_databricks.task_config is not None + assert my_databricks.task_config.spark_conf == {"spark": "2"} + assert my_databricks.task_config.databricks_conf == databricks_conf + assert my_databricks.task_config.databricks_instance == databricks_instance + assert my_databricks.task_config.databricks_token == databricks_token + def test_new_spark_session(): name = "SessionName"