Skip to content

Commit

Permalink
Add support for Stored Procedure for Apache Spark (GoogleCloudPlatfor…
Browse files Browse the repository at this point in the history
…m#9793)

* impl for hashicorp/terraform-provider-google#16953

* modified format in example

* modified tab to space

* added test for update and test for coverage

* mofify all resource in the spark option section

* modified connection id to avoid conflict
  • Loading branch information
marblejenka authored and bskaplan committed Jan 17, 2024
1 parent 12e2773 commit ec8f642
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 0 deletions.
82 changes: 82 additions & 0 deletions mmv1/products/bigquery/Routine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ examples:
vars:
dataset_id: 'dataset_id'
routine_id: 'routine_id'
- !ruby/object:Provider::Terraform::Examples
name: 'big_query_routine_pyspark'
primary_resource_id: 'pyspark'
vars:
dataset_id: 'dataset_id'
connection_id: 'connection_id'
routine_id: 'routine_id'
- !ruby/object:Provider::Terraform::Examples
name: 'big_query_routine_pyspark_mainfile'
primary_resource_id: 'pyspark_mainfile'
vars:
dataset_id: 'dataset_id'
connection_id: 'connection_id'
routine_id: 'routine_id'
- !ruby/object:Provider::Terraform::Examples
name: 'big_query_routine_spark_jar'
primary_resource_id: 'spark_jar'
vars:
dataset_id: 'dataset_id'
connection_id: 'connection_id'
routine_id: 'routine_id'
properties:
- !ruby/object:Api::Type::NestedObject
name: routineReference
Expand Down Expand Up @@ -101,6 +122,9 @@ properties:
values:
- :SQL
- :JAVASCRIPT
- :PYTHON
- :JAVA
- :SCALA
- !ruby/object:Api::Type::Array
name: 'arguments'
description: Input/output argument of a function or a stored procedure.
Expand Down Expand Up @@ -201,3 +225,61 @@ properties:
- :DETERMINISM_LEVEL_UNSPECIFIED
- :DETERMINISTIC
- :NOT_DETERMINISTIC
- !ruby/object:Api::Type::NestedObject
name: 'sparkOptions'
description: |
Optional. If language is one of "PYTHON", "JAVA", "SCALA", this field stores the options for spark stored procedure.
properties:
- !ruby/object:Api::Type::String
name: 'connection'
description: |
Fully qualified name of the user-provided Spark connection object.
Format: "projects/{projectId}/locations/{locationId}/connections/{connectionId}"
- !ruby/object:Api::Type::String
name: 'runtimeVersion'
description: Runtime version. If not specified, the default runtime version is used.
- !ruby/object:Api::Type::String
name: 'containerImage'
description: Custom container image for the runtime environment.
- !ruby/object:Api::Type::KeyValuePairs
name: "properties"
description: |
Configuration properties as a set of key/value pairs, which will be passed on to the Spark application.
For more information, see Apache Spark and the procedure option list.
An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.
default_from_api: true
- !ruby/object:Api::Type::String
name: 'mainFileUri'
description: |
The main file/jar URI of the Spark application.
Exactly one of the definitionBody field and the mainFileUri field must be set for Python.
Exactly one of mainClass and mainFileUri field should be set for Java/Scala language type.
- !ruby/object:Api::Type::Array
name: 'pyFileUris'
description: |
Python files to be placed on the PYTHONPATH for PySpark application. Supported file types: .py, .egg, and .zip. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::Array
name: 'jarUris'
description: |
JARs to include on the driver and executor CLASSPATH. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::Array
name: 'fileUris'
description: |
Files to be placed in the working directory of each executor. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::Array
name: 'archiveUris'
description: |
Archive files to be extracted into the working directory of each executor. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::String
name: 'mainClass'
description: |
The fully qualified name of a class in jarUris, for example, com.example.wordcount.
Exactly one of mainClass and main_jar_uri field should be set for Java/Scala language type.
41 changes: 41 additions & 0 deletions mmv1/templates/terraform/examples/big_query_routine_pyspark.tf.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
resource "google_bigquery_dataset" "test" {
dataset_id = "<%= ctx[:vars]['dataset_id'] %>"
}

resource "google_bigquery_connection" "test" {
connection_id = "<%= ctx[:vars]['connection_id'] %>"
location = "US"
spark { }
}

resource "google_bigquery_routine" "<%= ctx[:primary_resource_id] %>" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "<%= ctx[:vars]['routine_id'] %>"
routine_type = "PROCEDURE"
language = "PYTHON"
definition_body = <<-EOS
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
.option("table", "bigquery-public-data:samples.shakespeare") \
.load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
.option("writeMethod", "direct") \
.save("wordcount_dataset.wordcount_output")
EOS
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
resource "google_bigquery_dataset" "test" {
dataset_id = "<%= ctx[:vars]['dataset_id'] %>"
}

resource "google_bigquery_connection" "test" {
connection_id = "<%= ctx[:vars]['connection_id'] %>"
location = "US"
spark { }
}

resource "google_bigquery_routine" "<%= ctx[:primary_resource_id] %>" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "<%= ctx[:vars]['routine_id'] %>"
routine_type = "PROCEDURE"
language = "PYTHON"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.1"
main_file_uri = "gs://test-bucket/main.py"
py_file_uris = ["gs://test-bucket/lib.py"]
file_uris = ["gs://test-bucket/distribute_in_executor.json"]
archive_uris = ["gs://test-bucket/distribute_in_executor.tar.gz"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
resource "google_bigquery_dataset" "test" {
dataset_id = "<%= ctx[:vars]['dataset_id'] %>"
}

resource "google_bigquery_connection" "test" {
connection_id = "<%= ctx[:vars]['connection_id'] %>"
location = "US"
spark { }
}

resource "google_bigquery_routine" "<%= ctx[:primary_resource_id] %>" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "<%= ctx[:vars]['routine_id'] %>"
routine_type = "PROCEDURE"
language = "SCALA"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.1"
container_image = "gcr.io/my-project-id/my-spark-image:latest"
main_class = "com.google.test.jar.MainClass"
jar_uris = [ "gs://test-bucket/uberjar_spark_spark3.jar" ]
properties = {
"spark.dataproc.scaling.version" : "2",
"spark.reducer.fetchMigratedShuffle.enabled" : "true",
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,96 @@ resource "google_bigquery_routine" "sproc" {
}
`, dataset, routine)
}

func TestAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckBigQueryRoutineDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryRoutine_bigQueryRoutineSparkJar(context),
},
{
ResourceName: "google_bigquery_routine.spark_jar",
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(context),
},
{
ResourceName: "google_bigquery_routine.spark_jar",
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccBigQueryRoutine_bigQueryRoutineSparkJar(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_dataset_id%{random_suffix}"
}
resource "google_bigquery_connection" "test" {
connection_id = "tf_test_connection_id%{random_suffix}"
location = "US"
spark { }
}
resource "google_bigquery_routine" "spark_jar" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "tf_test_routine_id%{random_suffix}"
routine_type = "PROCEDURE"
language = "SCALA"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.0"
main_class = "com.google.test.jar.MainClass"
jar_uris = [ "gs://test-bucket/testjar_spark_spark3.jar" ]
}
}
`, context)
}

func testAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_dataset_id%{random_suffix}"
}
resource "google_bigquery_connection" "test_updated" {
connection_id = "tf_test_connection_updated_id%{random_suffix}"
location = "US"
spark { }
}
resource "google_bigquery_routine" "spark_jar" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "tf_test_routine_id%{random_suffix}"
routine_type = "PROCEDURE"
language = "SCALA"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test_updated.name
runtime_version = "2.1"
container_image = "gcr.io/my-project-id/my-spark-image:latest"
main_class = "com.google.test.jar.MainClassUpdated"
jar_uris = [ "gs://test-bucket/uberjar_spark_spark3_updated.jar" ]
properties = {
"spark.dataproc.scaling.version" : "2",
"spark.reducer.fetchMigratedShuffle.enabled" : "true",
}
}
}
`, context)
}

0 comments on commit ec8f642

Please sign in to comment.