From f6fddd7e603939e1190ce4d26a6a2575dfeea25e Mon Sep 17 00:00:00 2001 From: "Roel M. Hogervorst" Date: Fri, 10 Apr 2020 14:08:23 +0200 Subject: [PATCH 1/3] GCP SparkR Example Allows you to schedule R, and sparkR jobs on a dataproc cluster. The functionality to run that kind of job is already in dataproc, but it was not so clear how to do that from Airflow. --- .../cloud/example_dags/example_dataproc.py | 14 +++++++++++++- .../cloud/operators/test_dataproc_system.py | 18 ++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py index a2f1c82bfe36..573a5a782cf0 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc.py +++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py @@ -38,7 +38,8 @@ OUTPUT_PATH = "gs://{}/{}/".format(BUCKET, OUTPUT_FOLDER) PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py") PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN) - +SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R") +SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN) # Cluster definition CLUSTER = { @@ -104,6 +105,12 @@ "pyspark_job": {"main_python_file_uri": PYSPARK_URI}, } +SPARKR_JOB = { + "reference": {"project_id": PROJECT_ID}, + "placement": {"cluster_name": CLUSTER_NAME}, + "sparkRJob": {"mainRFileUri": SPARKR_URI}, +} + HIVE_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, @@ -157,6 +164,10 @@ task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID ) + sparkr_task = DataprocSubmitJobOperator( + task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID + ) + hive_task = DataprocSubmitJobOperator( task_id="hive_task", job=HIVE_JOB, location=REGION, project_id=PROJECT_ID ) @@ -178,4 +189,5 @@ scale_cluster >> spark_sql_task >> delete_cluster scale_cluster >> spark_task >> delete_cluster scale_cluster >> pyspark_task >> delete_cluster + scale_cluster >> sparkr_task >> delete_cluster scale_cluster >> hadoop_task >> delete_cluster diff --git a/tests/providers/google/cloud/operators/test_dataproc_system.py b/tests/providers/google/cloud/operators/test_dataproc_system.py index e6779a6baa9d..703addb85b6d 100644 --- a/tests/providers/google/cloud/operators/test_dataproc_system.py +++ b/tests/providers/google/cloud/operators/test_dataproc_system.py @@ -25,6 +25,8 @@ BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests") PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py") PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN) +SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R") +SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN) pyspark_file = """ #!/usr/bin/python @@ -35,16 +37,28 @@ print(words) """ +sparkr_file = """ +#!/usr/bin/r +sparkR.session() +# Create the SparkDataFrame +df <- as.DataFrame(faithful) +head(summarize(groupBy(df, df$waiting), count = n(df$waiting))) +""" + @pytest.mark.backend("mysql", "postgres") @pytest.mark.credential_file(GCP_DATAPROC_KEY) class DataprocExampleDagsTest(GoogleSystemTest): - @provide_gcp_context(GCP_DATAPROC_KEY) def setUp(self): super().setUp() self.create_gcs_bucket(BUCKET) - self.upload_content_to_gcs(lines=pyspark_file, bucket=PYSPARK_URI, filename=PYSPARK_MAIN) + self.upload_content_to_gcs( + lines=pyspark_file, bucket=PYSPARK_URI, filename=PYSPARK_MAIN + ) + self.upload_content_to_gcs( + lines=sparkr_file, bucket=SPARKR_URI, filename=SPARKR_MAIN + ) @provide_gcp_context(GCP_DATAPROC_KEY) def tearDown(self): From 2549e1ecfcc25e725ffe8e9da6ee73bb7e7aa927 Mon Sep 17 00:00:00 2001 From: Roel Hogervorst CB <41999859+roelhogervorst@users.noreply.github.com> Date: Wed, 15 Apr 2020 09:45:47 +0200 Subject: [PATCH 2/3] Update airflow/providers/google/cloud/example_dags/example_dataproc.py Co-Authored-By: Tomek Urbaszek --- airflow/providers/google/cloud/example_dags/example_dataproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py index 573a5a782cf0..b6e1070df72b 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc.py +++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py @@ -108,7 +108,7 @@ SPARKR_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, - "sparkRJob": {"mainRFileUri": SPARKR_URI}, + "spark_r_job": {"main_r_file_uri": SPARKR_URI}, } HIVE_JOB = { From 933cd6e30d79d28ce8412fa5ef7ac0020af6bf21 Mon Sep 17 00:00:00 2001 From: "Roel M. Hogervorst" Date: Wed, 15 Apr 2020 13:31:27 +0200 Subject: [PATCH 3/3] Make sure R file finds correct library --- .../providers/google/cloud/operators/test_dataproc_system.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/providers/google/cloud/operators/test_dataproc_system.py b/tests/providers/google/cloud/operators/test_dataproc_system.py index 703addb85b6d..863ad5b5ce09 100644 --- a/tests/providers/google/cloud/operators/test_dataproc_system.py +++ b/tests/providers/google/cloud/operators/test_dataproc_system.py @@ -39,6 +39,10 @@ sparkr_file = """ #!/usr/bin/r +if (nchar(Sys.getenv("SPARK_HOME")) < 1) { +Sys.setenv(SPARK_HOME = "/home/spark") +} +library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) sparkR.session() # Create the SparkDataFrame df <- as.DataFrame(faithful)